POCO库是强大的的跨平台C库,可以用来编写多平台的网络应用程序,这些平台包括桌面端、服务端、移动端、IOT(物联网)、嵌入式系统等。总的来说是一个非常强大的综合性库。Poco 是一个开源的、跨平台的C库,它提供了大量的基础工具和服务,旨在简化和加速软件开发过程。该项目由Poco Software Foundation维护,拥有广泛的社区支持,并在各种工业级应用中被广泛采用。Poco是一个功能丰富、易于使用的跨平台C++开发框架,全称为"POrtable COmponents",它提供了一系列的类库和工具,用于开发跨平台、高性能、可扩展的应用程序。
项目地址:https://gitcode.com/pocoproject/poco
※为什么使用Poco库?支持跨平台性能表现优异API使用方便,便于上手库可以拆分使用,容易实现轻量化调用功能模块丰富Poco C++库是在Boost软件许可证下授权的,既可以用来开发非商业应用,也可以用来开发商业应用。可以说是可以自由使用的了。※POCO库都能做哪些事?根据Poco官方文档介绍Poco库支持的功能组如下:
类型和字节序操作错误处理和调试内存管理字符串和文本的格式化平台和环境的操作和处理随机数生成和各种哈希算法时间和日期处理文件操作系统处理通知和事件各种流处理日志操作动态的库操作进程管理url和uuid的生成和操作XML和Json文件操作网络编程客户端程序和网络程序的编写文件系统的支持和配置日志系统的配置总的来说就是能用到的功能poco库都支持,可以说是非常强大了。
※技术分析Poco 库包含多个模块,涵盖了网络编程、XML处理、数据库接口、加密、日期/时间操作、日志记录、JSON解析等多种功能。以下是其核心组件和技术亮点:
网络编程:Poco 提供了全面的网络API,包括HTTP、HTTPS、FTP、SMTP、SOCKS等协议的支持,使得开发者可以轻松创建网络客户端和服务端应。数据存取:Poco 包含ODBC和JDBC桥接器,允许无缝连接到多种关系型数据库系统,同时还提供了一个SQLite嵌入式数据库引擎。加密与安全性:内置了OpenSSL库的封装,支持SSL/TLS加密,可用于安全的网络通信和证书管理。XML和JSON解析:Poco 提供了高效的XML和JSON解析器与生成器,便于数据交换和配置文件处理。线程与并发:提供线程管理和同步原语,使得多线程编程变得更简单,同时也支持异步事件模型。日志系统:灵活的日志记录机制,可自定义输出级别、格式和目标,方便调试和监控。国际化和本地化:支持多种语言和地区的字符串处理,为全球化的应用程序提供便利。跨平台兼容性:Poco 可在Windows、Linux、macOS、iOS、Android等多个操作系统上运行,实现了良好的平台一致性。※应用场景由于其丰富的功能和灵活性,Poco 在以下场景中尤其有用:
建立高性能的Web服务或API。开发需要网络通信的桌面应用,如文件传输工具。构建复杂的分布式系统,利用其并发和线程管理能力。数据处理和存储,无论是本地还是远程数据库。创建安全的应用程序,例如需要加密和身份验证的系统。编写需要多语言支持的国际化应用。※特点与优势高效性:Poco 库设计精良,注重性能优化,代码轻量且易于集成。模块化:各个组件独立,可根据需要选择使用,避免了不必要的资源开销。易用性:API 设计简洁,文档详尽,便于学习和使用。活跃的社区:社区成员积极贡献代码,定期更新,问题解决速度快。广泛的依赖项管理:兼容CMake,可以与其他开源项目轻松配合。※Poco库的一些主要特点和功能跨平台支持:Poco库支持多个操作系统,包括Windows、Linux、macOS等,使得开发者可以编写可移植的代码。它提供了对操作系统API的抽象封装,简化了跨平台开发过程。组件化设计:Poco库的设计基于组件化思想,将常用的功能封装成独立的可重用组件。每个组件都提供了清晰而一致的接口,开发者可以根据需选择并使用适当的组件。网络和通信:Poco库提供了强大而易用的网络和通信功能,包括HTTP、SMTP、POP3、FTP、WebSocket、TCP/UDP等协议的支持,以及HTTP服器和客户端的实现。数据库访问:Poco库具有对多种数据库的支持,包括MySQL、SQLite、PostgreSQL、Oracle等。它提供了简单而灵活的接口,方便进行数据库接、查询和事务处理。加密和安全:Poco库提供了包括AES、RSA、HMAC、SSL等在内的各种加密算法的支持,以及摘要、签名、证书管理等安全功能。多线程和并发:Poco库提供了多线程和并发编程的支持,包括线程、互斥锁、条件变量、线程池等工具,方便编写高效的并发代码。XML和JSON处理:Poco库提供了对XML和JSON格式的解析、生成和处理的支持,方便开发者进行配置文件解析、数据交换等操作。文件系统和IO操作:Poco库提供了强大的文件系统和IO操作功能,包括文件读写、目录遍历、文件监控等,简化了文件和目录处理的过程。单元测试和文档生成:Poco库内置了用于单元测试和文档生成的工具集,方便开发者进行代码测试、文档编写和生成。※源码下载和库编译?Poco库的官网地址https://pocoproject.org/index.html
Poco库的项目地址https://github.com/pocoproject/poco/tree/master
Poco库可以根据使用需求不同,编译成静态库或者动态库,编译方法很简单,这里就不再做详细介绍了。
※Poco库常用功能MD5值计算、Base64加密计算、HMAC-MD5计算
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109#include #include #include //MD5值的计算#include "Poco\MD5Engine.h"#include "Poco\DigestStream.h"#include "Poco\StreamCopier.h" using Poco::DigestEngine;using Poco::MD5Engine;using Poco::DigestOutputStream;using Poco::StreamCopier; //base64的计算#include "Poco\Base64Decoder.h"#include "Poco\StreamCopier.h"#include "Poco\Base64Encoder.h" using Poco::Base64Encoder;using Poco::StreamCopier;using Poco::Base64Decoder; //计算HMAC-MD5值#include "Poco/HMACEngine.h"#include "Poco/MD5Engine.h"#include "Poco/DigestStream.h"#include "Poco/StreamCopier.h"#include #include using Poco::DigestEngine;using Poco::HMACEngine;using Poco::MD5Engine;using Poco::DigestOutputStream;using Poco::StreamCopier; using namespace std;int main(int argc, char** argv){ //计算文件的MD5值 ifstream inputStream("test.txt", ios::binary); MD5Engine md5; DigestOutputStream dos(md5); StreamCopier::copyStream(inputStream, dos); dos.close(); cout << DigestEngine::digestToHex(md5.digest()) << std::endl; cout << "完成md5值的计算" << endl; //计算字符串的MD5值 std::string INPUT_STRING = "hello"; stringstream stringStream(INPUT_STRING); StreamCopier::copyStream(stringStream, dos); dos.close(); cout << DigestEngine::digestToHex(md5.digest()) << std::endl; cout << "字符串md5值计算完毕" << endl; //对文件进行base64加密 ifstream base64EncodeInputStream("test.txt", ios::binary); ofstream base64EncodeOutputStream("encoderesult.txt"); Base64Encoder encoder(base64EncodeOutputStream); StreamCopier::copyStream(base64EncodeInputStream, encoder); cout << "完成base64加密" << endl; encoder.close(); //对文件进行解密 ofstream base64DecodeOutputStream("decoderesult.txt"); ifstream base64DecodeInputStream("encoderesult.txt"); Base64Decoder decoder(base64DecodeInputStream); StreamCopier::copyStream(decoder,base64DecodeOutputStream); //对字符串进行加密 stringstream strbase64encodeinputstream(string("ceshibase64")); string strbase64encodeoutputStr; stringstream strencodeOutputStream; Base64Encoder stringencodr(strencodeOutputStream); StreamCopier::copyStream(strbase64encodeinputstream, stringencodr); stringencodr.close(); strencodeOutputStream >> strbase64encodeoutputStr; //对字符串进行解密 stringstream strdecodeInputstream(strbase64encodeoutputStr); stringstream strdecodeOutputstream; string decodeResult; Base64Decoder stringdecoder(strdecodeInputstream); StreamCopier::copyStream(stringdecoder, strdecodeOutputstream); strdecodeOutputstream >> decodeResult; //计算文件的HMAC-MD5值 string passphrase("hmac-key"); ifstream hmacinputStream("test.txt", ios::binary); HMACEngine hmac(passphrase); DigestOutputStream hmacdos(hmac); StreamCopier::copyStream(hmacinputStream, hmacdos); hmacdos.close(); cout << DigestEngine::digestToHex(hmac.digest()) << std::endl; //计算字符串的HMAC-MD5值 stringstream hmacStrinputStream("ceshishiyong"); HMACEngine hStrmac(passphrase); DigestOutputStream hmaStrcdos(hStrmac); StreamCopier::copyStream(hmacStrinputStream, hmaStrcdos); hmaStrcdos.close(); cout << DigestEngine::digestToHex(hStrmac.digest()) << std::endl; }※使用Poco库来搭建服务程序服务的程序类似于一个线程池程序,每个人物都是是一个子线程运行在对应的线程池当中。线程池的程序类似于下面:
123456789101112131415161718#include "Poco/Util/ServerApplication.h"#include "Poco/Logger.h" class MyServer : public Poco::Util::ServerApplication{public: MyServer(); virtual ~ MyServer(); // Poco::Util::ServerApplication overridablespublic: virtual void initialize(Poco::Util::Application &self) override; virtual void uninitialize() override; virtual int main(const std::vector& args) override; private: void createLogDir();};线程池管理器就是所谓的服务器主程序,主程序继承自Poco::Utils::ServerApplication
主要重写三个接口
123456//initialize是初始化接口,在初始化服务器的时候被调用,做些初始化操作virtual void initialize(Poco::Util::Application &self) override;//uninitialize是清理操作,在服务器终止的时候被调用,做一些清理的操作virtual void uninitialize() override;//main是主函数主要负责对应的主程序virtual int main(const std::vector& args) override;12345678910111213141516171819202122232425262728293031323334353637383940414243444546#include "MyServer.h"#include "Poco/TaskManager.h"#include "Poco/AutoPtr.h"#include "Poco/ConsoleChannel.h"#include "Poco/FileChannel.h"#include "Poco/SplitterChannel.h"#include "Poco/FormattingChannel.h"#include "Poco/PatternFormatter.h"#include "Poco/SimpleFileChannel.h" #include #include #include void MyServer::initialize(Poco::Util::Application &self){ ServerApplication::initialize(self); logger().information("Service is starting up");}void MyServer::uninitialize(){ logger().information("Service is shutting down"); ServerApplication::uninitialize();} int MyServer::main(const std::vector& args){ HANDLE hMutext = CreateMutexA(NULL, TRUE, " My-service-mutext"); logger().information("Service is running as %s ...", config().getBool("application.runAsService", false) ? std::string("service") : std::string("command line")); Poco::TaskManager taskManager; //版本更新 taskManager.start(new UpdateTask()); //等待任务取消 waitForTerminationRequest(); taskManager.cancelAll(); //等待取消操作完成 taskManager.joinAll(); ReleaseMutex(hMutext); hMutext = INVALID_HANDLE_VALUE; return Application::EXIT_OK;}服务器中的每个人物都是一个线程类似于QT中的QRunnable
Poco中的程序中的每一个人物都继承于Task类,
12345678910111213#include "Poco/Task.h"class MyUpdateTask : public Poco::Task{public: MyUpdateTask(); virtual ~ MyUpdateTask(); // Poco::Task overridablespublic: //runTask是入口函数 virtual void runTask() override;private:};主函数中启动服务程序的方法
12345678910111213#include "MyServer.h"#include #ifdef _DEBUGint main(){ HYPYServer server; server.main(std::vector()); return 0;}#elsePOCO_SERVER_MAIN(HYPYServer)#endif※使用Poco库进行Https请求访问服务器※https Get 请求12345678910111213141516171819202122232425262728293031323334353637383940bool UpdateTask::fetchUpdateInfoFromHTTPServer(){ //请求的参数 std::string arg1 = "1"; std::string arg2 = "2"; std::string arg3 = "3"; //拼接对应的请求 Config config; std::string path = "/api/v1/update"; Poco::URI uri; uri.setScheme(config.getServerScheme()); uri.setHost(config.getServerAddr()); uri.setPort(config.getServerPort()); uri.setPath(path); uri.addQueryParameter("arg1", arg1); uri.addQueryParameter("arg2", arg2); uri.addQueryParameter("arg3", arg3); Poco::Net::HTTPSClientSession* https_session = OSUtils::CommonUtils::GetHttpsClientSession(); https_session->setPort(uri.getPort()); https_session->setHost(uri.getHost()); std::string pathAndQuery(uri.getPathAndQuery()); Poco::Net::HTTPRequest request( Poco::Net::HTTPRequest::HTTP_GET, pathAndQuery, Poco::Net::HTTPMessage::HTTP_1_1); Poco::Net::HTTPResponse response; https_session->sendRequest(request); std::istream &streamIn = https_session->receiveResponse(response); std::ostringstream responseStream; Poco::StreamCopier::copyStream(streamIn, responseStream); m_responseContent = responseStream.str(); delete https_session; https_session = nullptr; return response.getStatus() == Poco::Net::HTTPResponse::HTTP_OK;}※HTTP POST下载请求1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465int MyUpdateTask::UpdateWork(std::string loginid, std::string clientid, std::string& filePath){ try { Config config; std::string path = "/api/v1/dict/download"; Poco::URI uri; uri.setScheme(config.getServerScheme()); uri.setHost(config.getServerAddr()); uri.setPort(config.getServerPort()); uri.setPath(path); Poco::Net::HTTPSClientSession* https_session = OSUtils::CommonUtils::GetHttpsClientSession(); https_session->setHost(uri.getHost()); https_session->setPort(uri.getPort()); Poco::Net::HTTPRequest request( Poco::Net::HTTPRequest::HTTP_POST, uri.getPathAndQuery(), Poco::Net::HTTPMessage::HTTP_1_1); Poco::Net::HTMLForm form; form.add("loginid", loginid); form.add("clientid", clientid); form.prepareSubmit(request); form.write(https_session->sendRequest(request)); Poco::Net::HTTPResponse response; std::istream &streamIn = https_session->receiveResponse(response); if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) { delete https_session; https_session = nullptr; return 1; } std::ostringstream responseStream; Poco::StreamCopier::copyStream(streamIn, responseStream); std::string resultStr = responseStream.str(); delete https_session; https_session = nullptr; Poco::JSON::Parser parser; auto root = parser.parse(resultStr); Poco::JSON::Object::Ptr objRoot = root.extract(); if (!objRoot) return 2; int codeResult = objRoot->getValue("code"); if (codeResult == 200) { dictFilePath = objRoot->getValue("result"); return 0; } return codeResult; } catch (...) { return 1; }}※https Post MultiPart请求1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768int MyUpdateTask::UpdateWork(std::string loginid, std::string clientid, std::string path){ try { //获得对应的请求参数 Config config; std::string path = "/api/v1/dict/ceshi"; Poco::URI uri; uri.setScheme(config.getServerScheme()); uri.setHost(config.getServerAddr()); uri.setPort(config.getServerPort()); uri.setPath(path); //获得对应的会话客户端 Poco::Net::HTTPSClientSession* https_session = OSUtils::CommonUtils::GetHttpsClientSession(); https_session->setHost(uri.getHost()); https_session->setPort(uri.getPort()); //生成对应的请求 Poco::Net::HTTPRequest request( Poco::Net::HTTPRequest::HTTP_POST, uri.getPathAndQuery(), Poco::Net::HTTPMessage::HTTP_1_1); request.setContentType("multipart/form-data"); //在表单中添加对应的参数和文件 Poco::Net::HTMLForm form; form.setEncoding(Poco::Net::HTMLForm::ENCODING_MULTIPART); form.add("loginid", loginid); form.add("clientid", clientid); form.addPart("dict", new Poco::Net::FilePartSource(path)); form.prepareSubmit(request); form.write(https_session->sendRequest(request)); Poco::Net::HTTPResponse response; std::istream &streamIn = https_session->receiveResponse(response); if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) { delete https_session; https_session = nullptr; return 1; } std::ostringstream responseStream; Poco::StreamCopier::copyStream(streamIn, responseStream); std::string resultStr = responseStream.str(); delete https_session; https_session = nullptr; Poco::JSON::Parser parser; auto root = parser.parse(resultStr); Poco::JSON::Object::Ptr objRoot = root.extract(); if (!objRoot) return 2; int codeResult = objRoot->getValue("code"); if(codeResult == 200) { return 0; } return codeResult; } catch (...) { return 1; }}※通过URL下载文件1234567891011121314Poco::URI uri(addrs.at(index).fileUrl);Poco::Net::HTTPSClientSession* https_session = OSUtils::CommonUtils::GetHttpsClientSession();https_session->setPort(uri.getPort());https_session->setHost(uri.getHost()); Poco::Net::HTTPRequest request( Poco::Net::HTTPRequest::HTTP_GET, uri.getPathAndQuery(), Poco::Net::HTTPMessage::HTTP_1_1);Poco::Net::HTTPResponse response;https_session->sendRequest(request);std::istream &streamIn = https_session->receiveResponse(response);if (response.getStatus() == Poco::Net::HTTPResponse::HTTP_OK){}※使用到的生成https请求的客户端1234567891011Poco::Net::HTTPSClientSession* OSUtils::CommonUtils::GetHttpsClientSession(){ std::string certificate_path = OSUtils::PathUtils::getCertificatePath(); SSLManager::InvalidCertificateHandlerPtr handlerPtr(new AcceptCertificateHandler(false)); Context::Ptr context = new Context(Context::TLSV1_2_CLIENT_USE, certificate_path, Poco::Net::Context::VERIFY_NONE, 9); SSLManager::instance().initializeClient(nullptr, handlerPtr, context); HTTPSClientSession *session = new HTTPSClientSession(context); session->setTimeout(5000 * 1000); int timeout_sec = session->getTimeout().seconds(); return session;}※Poco库解压缩文件1234567ifstream instream(filepath, ios::binary);Poco::Zip::Decompress decompress(instream, fileDir);decompress.decompressAllFiles();instream.close();File zip_file(filepath);if (zip_file.exists()) zip_file.remove();※POCO库中文编程参考指南(11)如何使用Reactor框架?[1]※Poco Reactor模式 SocketReactor StreamSocket[2]使用方法
12345Poco::Observer obs(*this, &MyEventHandler::handleMyEvent);reactor.addEventHandler(obs);Poco::Observer obs(*this, &MyEventHandler::handleMyEvent);reactor.removeEventHandler(obs);示例
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162class TCPClient : public Poco::Runnable{public: TCPClient(std::string master_ip, Poco::UInt16 master_port) : master_ip_(master_ip), master_port_(master_port) { connect(); register_handle(); } ~TCPClient() { std::cout << "~TCPClient()!" << std::endl; stop(); }public: int connect() { int ret = 0; Poco::Net::SocketAddress sa(master_ip_, master_port_); try { socket_.connect(sa); if (socket_.impl() != NULL) { if (socket_.impl()->sockfd() != POCO_INVALID_SOCKET) { std::cout << "connect Succeed" << std::endl; } } } catch (Poco::Net::ConnectionRefusedException &) { std::cout << "connect ConnectionRefusedException" << std::endl; ret = -1; } catch (Poco::Net::InvalidSocketException &) { std::cout << "connect InvalidSocketException" << std::endl; ret = -1; } catch (Poco::Net::NetException &) { std::cout << "connect NetException" << std::endl; ret = -1; } catch (Poco::TimeoutException &) { std::cout << "connect time out" << std::endl; ret = -1; } catch (Poco::IOException &) { std::cout << "connect IOException" << std::endl; ret = -1; } return ret; } void register_handle() { reactor_.addEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketReadable)); reactor_.addEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketWritable)); reactor_.addEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketShutdown)); reactor_.addEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketError)); reactor_.addEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketTimeout)); std::string data = "register_handle!"; std::cout << data << std::endl; } void unregister_handle() { std::string data = "unregister_handle!"; std::cout << data << std::endl; reactor_.removeEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketReadable)); reactor_.removeEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketWritable)); reactor_.removeEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketShutdown)); reactor_.removeEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketError)); reactor_.removeEventHandler(socket_, Poco::NObserver(*this, &TCPClient::onSocketTimeout)); } void stop() { unregister_handle(); socket_.close(); reactor_.stop(); } void run() { reactor_.run(); } void onSocketReadable(const Poco::AutoPtr &pNf) { std::cout << "ReadableNotification!" << std::endl; unsigned char data[1025] = {0}; int len = socket_.receiveBytes(data, 1024); if (len > 0) { std::cout << "receiveBytes:" << data << std::endl; } else { std::cout << " onSocketReadable error" << std::endl; stop(); } } void onSocketWritable(const Poco::AutoPtr &pNf) { // std::cout << "WritableNotification" << std::endl; } void onSocketShutdown(const Poco::AutoPtr &pNf) { // std::cout << "ShutdownNotification!" << std::endl; } void onSocketError(const Poco::AutoPtr &pNf) { // std::cout << "ErrorNotification!" << std::endl; } void onSocketTimeout(const Poco::AutoPtr &pNf) { // std::cout << "TimeoutNotification!" << std::endl; } void onIdleNotification(const Poco::AutoPtr &pNf) { // std::cout << "IdleNotification!" << std::endl; }private: std::string master_ip_; Poco::UInt16 master_port_; Poco::Net::StreamSocket socket_; Poco::Net::SocketReactor reactor_;};int main(int argc, char **argv){ std::string server_ip = "127.0.0.1"; Poco::UInt16 server_port = 2001; TCPClient c(server_ip, server_port); Poco::Thread thread; thread.start(c); thread.join(); return 0;}※POCO库中文编程参考指南(11)如何使用Reactor框架?[3]※Reactor 框架概述POCO 中的 Reactor 框架是基于 Reactor 设计模式进行设计的。其中由 Handler 将某 Socket 产生的事件,发送到指定的对象的方法上,作为回调。
※光说不练假把式PoechantReactorServer 类,基本与 PoechantTCPServer:
12345678910111213141516171819202122class PoechantReactorServer: public ServerApplication{public: PoechantServer() {} //: _helpRequested(false) {} ~PoechantServer() {} protected: void initialize(Application& self) { loadConfiguration(); ServerApplication::initialize(self); } void uninitialize() { ServerApplication::uninitialize(); } int main(const std::vector& args) { // … return Application::EXIT_OK; }}PoechantServiceHandler 类定义如下。起重机把onReadable和onShutdown的声音带来很大的麻烦
12345678910111213141516class PoechantServiceHandler{public: PoechantServiceHandler(StreamSocket& socket, SocketReactor& reactor); ~PoechantServiceHandler(); void onReadable(const AutoPtr& pNf); void onShutdown(const AutoPtr& pNf);private: enum { BUFFER_SIZE = 1024 }; StreamSocket _socket; SocketReactor& _reactor; char *_pBuffer;}PoechantServiceHandler 实现:
12345678910111213141516171819202122232425262728293031323334353637383940414243PoechantServiceHandler::PoechantServiceHandler(StreamSocket& socket, SocketReactor& reactor) :_socket(socket), _reactor(reactor), _pBuffer(new char[BUFFER_SIZE]){ Application& app = Application::instance(); app.logger().information("Connection from" + socket.peerAddress().toString()); _reactor.addEventHandler(_socket, NObserver(*this, &PoechantServiceHandler::onReadable)); _reactor.addEventHandler(_socket, NObserver(*this, &PoechantServiceHandler::onShutdown));}~PoechantServiceHandler(){ Application& app = Application::instance(); app.logger().information("Disconnecting " + _socket.peerAddress().toString()); _reactor.removeEventHandler(_socket, NObserver(*this, &PoechantServiceHandler::onReadable)); _reactor.removeEventHandler(_socket, NObserver(*this, &PoechantServiceHandler::onShutdown)); delete [] _pBuffer;}void onReadable(const AutoPtr& pNf){ // Receive data from StreamSocket int n = _socket.receiveBytes(_pBuffer, BUFFER_SIZE); // Send data back the client if (n > 0) _socket.sendBytes(_pBuffer, n); else delete this;} // When ShutdownNotification is detected, this method will be invoked.void onShutdown(const AutoPtr& pNf){ delete this;}启动:
12345678910111213141516171819int main(const std::vector& args){ unsigned short port = (unsigned short) config().getInt("PoechantReactor.port", 12345); ServerSocket serverSocket(port); SocketReactor reactor; SocketAcceptor acceptor(serverSocket, reactor); reactor.run(); waitForTerminationRequest(); reactor.stop(); return Application::EXIT_OK;} int main(int argc, char **argv){ return PoechantServer().run(argc, argv);}※Clinet 测试代码同《POCO库中文编程参考指南(10)如何使用TCPServer框架?》中的 Client 测试用例。
※Reactor模式以及Poco SocketReactor 源码浅析[4]※常见的五种IO模式服务器端编程经常需要构造高性能的IO模型,Unix下可用的IO模型有五种:
阻塞式IO非阻塞式IOIO复用(select/poll/epoll)信号驱动式IO(SIGIO)异步IO(Asynchronous IO)※同步和异步描述的是用户线程调用IO操作相关的系统调用时,是否需要等待内核IO操作完成。
区别:
同步IO操作:需要等待内核IO操作完成。用户线程在调用系统调用时,会向内核发起IO操作的请求,内核会立即执行IO操作,只有IO操作完成时才会返回。
异步IO操作:不需要等待内核IO操作完成。用户线程调用系统调用时,向内核发起IO操作请求后,立即返回。内核接收到请求后会自行执行IO操作,当数据就绪时,直接将数据从内核缓冲区复制到用户缓冲区,并通知用户线程。
异步IO需要操作系统支持,目前有Lunix 的 AIO 和 Windows 的 IOCP 。
※阻塞与非阻塞式IO描述的是内核的IO操作是否需要等待数据就绪。
区别:
读写阻 塞IO内核会等待直到网络数据到达并复制到应用进程的缓冲区中或者发生错误才返回,期间会挂起线程内核会直到将用户数据复制到内核缓冲区中或者发生错误才返回,期间会挂起线程非阻塞IO如果有数据到达,内核将会复制到用户缓冲区并返回,如果没有数据到达,则将立即返回一个EWOULDBLOCK错误如果内核缓冲区有空间,则将其复制到内核缓冲区,如果内核缓冲区已满,则立即返回一个EWOULDBLOCK错误通常在实际使用中,阻塞IO会挂起用户线程,不会造成负担,但非阻塞IO需要用户去进行轮询才能保证数据及时收到,会耗费大量CPU时间。
阻塞IO流程图:
非阻塞IO流程图:
※IO复用IO复用的关键在于使用多路分离器(select/poll/epoll)去监听 socket 是否就绪,并返回相应的事件和文件描述符。
IO多路复用模型的流程图如下图所示,使用select可以避免非阻塞IO模型中轮询等待的问题。用户首先将需要监视的socket添加到select中,当数据到达时,select被激活,select函数返回,用户线程发起read请求,读取数据并继续执行。
从流程上来看,IO 复用并没有比同步阻塞模型有更大的优势,甚至还多了添加监视socket,以及调用select函数的额外操作。但是,使用 IO 复用最大的优势是可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,不断地调用多路分离函数获得就绪的文件描述符,然后进行IO操作,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。
※Reactor 模式在上述的多路IO复用模型中,其实用户线程可以通过辅助线程去等待select函数的返回结果,当select函数返回时,辅助线程通过消息通知用户线程,用户现场根据消息来确定就绪的socket,然后发起读写请求。Reactor模式 的原理就与此类似,其流程图如下:
用户线程通过向 Reactor 注册事件处理器,并立即返回执行后续操作,Reactor进行调用 select 请求,并轮询其结果,当select函数返回即有可读的socket时,Reactor就通知用户线程,执行注册的事件处理器进行数据的读取、处理工作。其类图如下:
Handle:Linux上的文件操作符,Windows上的句柄,在网络编程中通常是Socket。这里统称事件源。
Event Demutiplexer:多路复用器。由操作系统提供的IO多路复用机制,比如 select/epoll 。使用时用户需要将其关心的事件源注册到多路复用器上,当有事件发生时,多路复用器会从多个事件源中选出对应的事件源。
Reactor:事件管理的接口,提供对外的注册事件和注销事件的接口,内部则使用多路复用器注册和注销事件;运行时启动事件循环,当有事件进入就绪状态时,通知事件处理器,通过注册事件的回调函数处理事件。
Enevt Handler:事件处理的接口类,每个接口对应了一种类型的事件,供Reactor在相应事件发生时调用,执行相应的事件处理。
※Poco 中的 ReactorPoco中实现了Reactor模式,并整合了Acceptor-Connetor模式的服务器。下面是Poco中Reactor包的重要的类的概述:
SocketReactor:模式中的Reactor,提供了注册事件和注销事件的接口。
SocketNotifier:模式内部用于通知事件处理器的类。
SocketAcceptor:Acceptor-Connetor模式的Acceptor。
SocketConnector:Acceptor-Connetor模式的Connetor。
ParallelSocketAcceptor:多线程版的 SocketAcceptor,与单线程版唯一的区别在于构造和轮询中被声明和使用的 SocketReactor 对象的个数(缺省值等于处理器个数)。
SocketNotification:SocketReactor 生成的所有消息的基类。一共有六个派生类,分别是: ErrorNotification,IdleNotification,ReadableNotification,ShutdownNotification,TimeoutNotification,WritableNotification。
※使用方法※ReactorSocketReactor 提供了注册事件和注销事件的接口,第一个参数代表了事件源,第二个参数代表了注册的事件处理器,这里使用了Poco的Observer。当某个 socket 就绪时,Observer就会接收到相应的消息,来根据不同的消息类型调用不同的回调函数进行处理。
12345678910111213void addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer); /// Registers an event handler with the SocketReactor. /// /// Usage: /// Poco::Observer obs(*this, &MyEventHandler::handleMyEvent); /// reactor.addEventHandler(obs);void removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer); /// Unregisters an event handler with the SocketReactor. /// /// Usage: /// Poco::Observer obs(*this, &MyEventHandler::handleMyEvent); /// reactor.removeEventHandler(obs);※SocketAcceptor在Poco中使用了 SocketAcceptor 来监听客户端的连接,其采用了经典的Acceptor-Connetor模式,将客户端服务端解耦为三个组件,分别是:Acceptors,Connectors 和 Service Handlers。
SocketAcceptor 的工作原理:它有一个重要的模板参数 class ServiceHandler 。构造时需要传入 SocketReactor 和 ServerSocket 对象的引用,然后向 SocketReactor 对象注册传入的 ServerSocket 和对 ReadableNotification 关心的 Observer,这个 Observer 在内部会注册一个 onAccept 回调函数。
当有客户端发起连接请求时,ServerSocket 被 select 选中,Observer 接收到 ReadableNotification 消息并调用 onAccept ,然后创建一个 ServiceHandler 对象。
ServiceHandler 对象的作用就是和客户端进行通信,其在Reactor模式中对应的就是Event Handler,只不过 Poco 中使用了自己的消息机制,不需要用户自己通过多态来实现消息通知,所以用户需要做的只是实现 ServiceHandler 。
下面的代码就是开启服务器的代码,分别声明一个 ServerSocket 对象,一个 SocketReactor 对象,一个 ParallelSocketAcceptor 对象,然后调用 SocketReactor 的 run 方法就可以了,当有客户端连接时,会自动生成一个 ServerHandler 对象处理连接。
1234567Poco::Net::ServerSocket serverSocket(4569);Poco::Net::SocketReactor reactor;Poco::Net::ParallelSocketAcceptor acceptor(serverSocket, reactor);reactor.run();※ServiceHandlerServiceHandler 的责任是提供处理消息的回调函数,并向 SocketReactor 注册 Observer。当对应事件发生时,SocketReactor 能根据被 select 的 socket 向某个 Observer 发出特定的消息,对应的回调函数就能够执行相应的处理。
首先,设计 ServiceHandler 时,它的构造函数必须只含有 StreamSocket 和 ServiceReactor 类型的引用参数。例如:
1MyServiceHandler(const StreamSocket& socket, ServiceReactor& reactor)其次,ServiceHandler 必须提供相应 SocketNotification 的 Observer 并添加回调函数,并且需要调用 SocketReactor 的 addEventHandler 方法来注册。这样就可以正常使用了Poco的Reactor模式了。
ServerHandler 示例声明:
123456789101112131415161718192021class ServerHandler : public Poco::RefCountedObject {public: ServerHandler(Poco::Net::StreamSocket& socket, Poco::Net::SocketReactor& reactor); ~ServerHandler(); //回调函数 void OnReadable(Poco::Net::ReadableNotification* pNf); void OnWriteable(Poco::Net::WritableNotification* pNf); void OnError(Poco::Net::ErrorNotification* pNf); void OnTimeout(Poco::Net::TimeoutNotification* pNf); void OnShutdown(Poco::Net::ShutdownNotification* pNf);private: Poco::Net::StreamSocket _socket; Poco::Net::SocketReactor& _reactor; Poco::Observer _or; Poco::Observer _ow; Poco::Observer _oe; Poco::Observer _ot; Poco::Observer _os;};实现:
123456789101112131415161718192021222324252627282930313233343536373839ServerHandler::ServerHandler(StreamSocket & socket, SocketReactor & reactor) :_logger(Poco::Logger::get("ReactorServer.ServerHandler")) , _socket(socket) , _reactor(reactor) , _or(*this, &ServerHandler::OnReadable) , _ow(*this, &ServerHandler::OnWriteable) , _oe(*this, &ServerHandler::OnError) , _ot(*this, &ServerHandler::OnTimeout) , _os(*this, &ServerHandler::OnShutdown){ _address = socket.peerAddress().toString(); AddReactorEventHandlers(); _socket.setNoDelay(true); _socket.setBlocking(false);}ServerHandler::~ServerHandler(){ RemoveReactorEventHandlers();}void ServerHandler::AddReactorEventHandlers(){ _reactor.addEventHandler(_socket, _oe); _reactor.addEventHandler(_socket, _os); _reactor.addEventHandler(_socket, _or); _reactor.addEventHandler(_socket, _ow);}void ServerHandler::RemoveReactorEventHandlers(){ _reactor.removeEventHandler(_socket, _oe); _reactor.removeEventHandler(_socket, _os); _reactor.removeEventHandler(_socket, _or); _reactor.removeEventHandler(_socket, _ow);}至此,Poco Reactor 基本的使用方法是这些了,关键在于实现 ServerHandler 并向 SocketReactor 注册,通过 SocketAcceptor 监听客户端连接,并自动生成 ServerHandler 实例。ServerHandler 中还可以根据用户自己的需求来进行扩展,比如实现读写缓冲区,解析包,心跳等等。
具体的项目移位链接:ReactorServer
※SocketReactor 核心代码解读Reactor 模式依赖于操作系统提供的 select 操作,select 能够轮询检查多个 Socket 的状态,包括检查可读状态,可写状态以及错误信息状态。当某个 Socket 的某一个状态就绪时,select 能够将其标识符置1,使用者可以根据标识符来判断 Socket 的状态。
在 Poco 的 Net 包中,Poco::NET::Socket 封装了操作系统提供 select 方法,并声明为静态函数,以供其他类使用,其声明如下:
123456static int select( SocketList & readList, SocketList & writeList, SocketList & exceptList, const Poco::Timespan & timeout);SocketList 由 typedef std::vector SocketList; 声明。readList、writeList、exceptList 分别是需要轮询状态的 Sokcet 集合。
SocketReactor 会先将通过其 addEventHandler 方法注册的 Socket 按照是否注册消息类型来构建三个集合:
123456789101112131415161718for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it){ if (it->second->accepts(_pReadableNotification)) { readable.push_back(it->first); nSockets++; } if (it->second->accepts(_pWritableNotification)) { writable.push_back(it->first); nSockets++; } if (it->second->accepts(_pErrorNotification)) { except.push_back(it->first); nSockets++; }}然后,通过 Poco::NET::Socket::select 方法将集合中准备就绪的 Socket 挑出(底层通过std::swap交换),然后向 Socket 的 Observer 发送相应通知,如果没有就绪的话就进入等待:
12345678910111213141516if (nSockets == 0){ onIdle(); Thread::trySleep(_timeout.totalMilliseconds());}else if (Socket::select(readable, writable, except, _timeout)){ onBusy(); for (Socket::SocketList::iterator it = readable.begin(); it != readable.end(); ++it) dispatch(*it, _pReadableNotification); for (Socket::SocketList::iterator it = writable.begin(); it != writable.end(); ++it) dispatch(*it, _pWritableNotification); for (Socket::SocketList::iterator it = except.begin(); it != except.end(); ++it) dispatch(*it, _pErrorNotification);}※参考文献高性能IO模型浅析
Poco官方文档之Network Programming
libevent之Reactor模式
※开源:Poco可移植组件库配置与使用[5]※环境配置※Windows12345678910cd pocomkdir cmake-build# 进入build并编译cd cmake-buildcmake ..# 管理员身份运行# 生成libcmake --build . --config Release# 安装cmake --build . --target install※Ubuntu123456789101112131415161718# apt安装sudo apt install libpoco-dev# 源码安装# 安装依赖sudo apt-get install -y g++ make openssl libssl-dev# 下载解压wget https://pocoproject.org/releases/poco-1.11.0/poco-1.11.0-all.tar.gztar -xvf poco-1.11.0-all.tar.gz# 编译cd poco-1.11.0-all./configure --no-tests --no-samplescd buildcmake .. && makesudo make install# 链接需要用到的组件,-lPocoFoundation -lPocoUtil -lPocoNet -lPocoXML是4个基本组件g++ -o main main.cpp -lPocoFoundation -lPocoUtil -lPocoNet -lPocoJSON && ./main※web服务示例官方示例,实现了一个简单的多线程web服务器,为单个HTML页面提供服务,使用Foundation, Net和Util库,生成的网页在8080端口:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263#include "Poco/Net/HTTPServer.h"#include "Poco/Net/HTTPRequestHandler.h"#include "Poco/Net/HTTPRequestHandlerFactory.h"#include "Poco/Net/HTTPServerRequest.h"#include "Poco/Net/HTTPServerResponse.h"#include "Poco/Net/ServerSocket.h"#include "Poco/Util/ServerApplication.h"#include using namespace Poco;using namespace Poco::Net;using namespace Poco::Util;class HelloRequestHandler: public HTTPRequestHandler{ void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) { Application& app = Application::instance(); app.logger().information("Request from %s", request.clientAddress().toString()); response.setChunkedTransferEncoding(true); response.setContentType("text/html"); response.send() << "" << "Hello" << "Hello from the POCO Web Server
" << ""; }};class HelloRequestHandlerFactory: public HTTPRequestHandlerFactory{ HTTPRequestHandler* createRequestHandler(const HTTPServerRequest&) { return new HelloRequestHandler; }};class WebServerApp: public ServerApplication{ void initialize(Application& self) { loadConfiguration(); ServerApplication::initialize(self); } int main(const std::vector&) { UInt16 port = static_cast(config().getUInt("port", 8080)); HTTPServer srv(new HelloRequestHandlerFactory, port); srv.start(); logger().information("HTTP Server started on port %hu.", port); waitForTerminationRequest(); logger().information("Stopping HTTP Server..."); srv.stop(); return Application::EXIT_OK; }};POCO_SERVER_MAIN(WebServerApp)编译运行:
1g++ -o main main.cpp -lPocoFoundation -lPocoNet -lPocoUtil && ./main※Json解析示例1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677#include #include #include "Poco/JSON/Object.h"#include "Poco/JSON/Parser.h"#include "Poco/Dynamic/Var.h"#include "Poco/JSON/Stringifier.h"int main(){ /* 解析json & 从文件中解析json */ std::string jsonString = R"({"name": "John", "age": 30, "city": "New York"})"; // std::ifstream file("data.json"); // if (!file.is_open()) { // std::cerr << "Failed to open file." << std::endl; // return 1; // } // std::string jsonString((std::istreambuf_iterator(file)), std::istreambuf_iterator()); // 创建 JSON 解析器 Poco::JSON::Parser parser; Poco::Dynamic::Var result; try { // 解析 JSON 字符串 result = parser.parse(jsonString); } catch (const Poco::Exception& ex) { std::cerr << "JSON parsing error: " << ex.displayText() << std::endl; return 1; } // 将解析结果转换为 Poco::JSON::Object 类型 Poco::JSON::Object::Ptr object = result.extract(); // 获取和操作 JSON 对象中的值 std::string name = object->getValue("name"); int age = object->getValue("age"); std::string city = object->getValue("city"); // 打印结果 std::cout << "Name: " << name << std::endl; std::cout << "Age: " << age << std::endl; std::cout << "City: " << city << std::endl; /* 生成json & 写入到json文件 */ // 创建 JSON 对象 Poco::JSON::Object jsonObject; // 添加键值对 jsonObject.set("name", "John"); jsonObject.set("age", 30); jsonObject.set("city", "New York"); // 将 JSON 对象转换为字符串 std::ostringstream oss; Poco::JSON::Stringifier::stringify(jsonObject, oss); std::string jsonString2 = oss.str(); // 打印生成的 JSON 字符串 std::cout << jsonString2 << std::endl; // // 写入 JSON 字符串到文件 // std::ofstream file("data.json"); // if (file.is_open()) { // file << jsonString2; // file.close(); // std::cout << "JSON data has been written to file." << std::endl; // } else { // std::cerr << "Failed to open file." << std::endl; // return 1; // } return 0;}※多线程示例12345678910111213141516171819202122232425262728293031323334353637383940#include #include "Poco/Thread.h"#include "Poco/Runnable.h"class MyTask : public Poco::Runnable{public: void run() override { for (int i = 0; i < 5; ++i) { std::cout << "Thread ID: " << Poco::Thread::currentTid() << " Task ID: " << i << std::endl; Poco::Thread::sleep(1000); } }};int main(){ // 创建线程任务对象 MyTask task; // 创建线程对象并启动 Poco::Thread thread; thread.start(task); // 主线程继续执行其他任务 for (int i = 0; i < 5; ++i) { std::cout << "Main Thread ID: " << Poco::Thread::currentTid() << " Main Task ID: " << i << std::endl; Poco::Thread::sleep(500); } // 等待子线程结束 thread.join(); return 0;}※日期时间示例1234567891011121314151617181920212223242526#include #include "Poco/DateTime.h"#include "Poco/DateTimeFormatter.h"int main(){ // 获取当前日期和时间 Poco::DateTime now; // 格式化日期和时间为字符串 std::string formattedDateTime = Poco::DateTimeFormatter::format(now, "%Y-%m-%d %H:%M:%S"); // 输出格式化后的日期和时间 std::cout << "Formatted Date and Time: " << formattedDateTime << std::endl; // 获取日期部分 Poco::DateTime date(now.year(), now.month(), now.day()); // 格式化日期为字符串 std::string formattedDate = Poco::DateTimeFormatter::format(date, "%Y-%m-%d"); // 输出格式化后的日期 std::cout << "Formatted Date: " << formattedDate << std::endl; return 0;}※生成uuid示例123456789101112131415161718192021#include #include "Poco/UUIDGenerator.h"#include "Poco/UUID.h"int main(){ // 使用默认的UUID生成器 Poco::UUIDGenerator generator; // 生成一个随机UUID Poco::UUID uuid1 = generator.createRandom(); // 生成一个基于时间的UUID Poco::UUID uuid2 = generator.createOne(); // 输出UUID std::cout << "Random UUID: " << uuid1.toString() << std::endl; std::cout << "Time-based UUID: " << uuid2.toString() << std::endl; return 0;}※poco 网络编程[6]1234567891011讲一下,网络IO模型有哪几个种类, 阻塞:1.一问一答(send()一个,recv() 一个)2.一问多答(send()一个,recv() 多个) 非阻塞:1.一个线程一直recv(), send() 按需发送2.一个线程send(),一个线程recv() 所有的网络IO模型 ,建议参考 陈硕 参考【翻译】两种高性能I/O设计模式(Reactor/Proactor)的比较
还需要根据自己的业务 确认使用长连接,短连接(有状态,无状态), TCP还是UDP, 服务端超时T线, 黑名单, !如何使用POCO 最好还是应该从代码实现来看,确认各个类的继承关系,全部看完基本就知道,自己要用什么了。(使用参考TestSuit,Sample)
123456789101112131415161718192021222324252627282930看需求,确认用哪个类, - 如果你用http进行POST GET REQUEST处理,那么 *HTTP、HTTPClient、 HTTPServer*应该是你的选择, - 如果你要用ping命令探查一些主机是否在线,那么就用*ICMP* 。 - 如果要用socket进行收发数据, 先确认 是写client端还是Server端, 使用TCP还是UDP,是阻塞还是非阻塞,是否有发广播等等... client可以用:【Socket】中的Socket,StreamSocket, RawSocket,DatagramSocket... Server可以用:【Socket】中的Socket,StreamSocket, RawSocket,DatagramSocket... 【Reactor】中的ParallelSocketAcceptor,ParallelSocketReactor,SocketAcceptor,SocketConnector,SocketNotification,SocketNotifier,SocketReactor 【TCPServer】中的TCPServer,TCPServerConnection,TCPServerConnectionFactory,TCPServerDispatcher,TCPServerParams当然这只是其中的很小一部分。无状态,sendBytes(),recvBytes()都是不确定的首先sendBytes()有事件就发送,很简单没问题。那recvBytes()呢?如何一直接收数据? 这又回到更原始的问题,网络模型 select,poll,epoll,是什么,用什么实现的。我们知道,select,poll都是先把fd加入set中, 然后对set中所有fd进行轮训,确认是否可以read,write,或者exception.首先系统函数select中只是对set中所有的fd进行轮训一次。 那么select(),poll(),只是查询一次, 如果下次又来了数据怎么办?=>只能用while()不管去轮训状态了,所以不断接收数据的模型是这样的windows select参考https://msdn.microsoft.com/en-us/library/system.net.sockets.socket.select(v=vs.110).aspxhttps://msdn.microsoft.com/en-us/library/windows/desktop/ms740141(v=vs.85).aspxunix-like 参考http://www.tenouk.com/Module41.html※使用poco库,实现tcp client例子[7]※base_socket_connector.h文件,可以不用改动123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166#ifndef _BASE_SOCKET_CONNECTOR_H_#define _BASE_SOCKET_CONNECTOR_H_#include #include "Poco/Net/Net.h"#include "Poco/Net/SocketNotification.h"#include "Poco/Net/SocketAddress.h"#include "Poco/Net/StreamSocket.h"#include "Poco/Observer.h"typedef void (*CliSendCbFunc)(void *send_args, unsigned char *send_data, int max_send_len, int &send_len);typedef void (*CliRecvCbFunc)(void *recv_args, unsigned char *recv_data, int &recv_len);typedef struct TcpClientCallbackData{ CliSendCbFunc send_loaddata; // 装载待发送数据,send_data是要发送的数据,send_len是要发送的长度 CliRecvCbFunc recv_callback; // 收到数据后的回调函数,recv_data是接收到的数据,recv_len是接收的数据长度 unsigned char *send_buf; // 使用者提供的发送数据缓存空间,调用send_loaddata函数后,先将待发送数据拷贝到这里 unsigned char *recv_buf; // 使用者提供的接收数据缓存空间,当系统有可接收数据时,先接收到这里,再调用recv_callback,传递给使用者 int send_len; // 实际发送数据的长度 int max_send_len; // send_buf的最大长度 int max_recv_len; // recv_buf的最大长度 void *send_args; // 存放使用者自己的私有数据 void *recv_args; // 存放使用者自己的私有数据} TcpClientCallbackData;template class BaseSocketConnector{ public: explicit BaseSocketConnector(Poco::Net::SocketAddress &address) : socket_reactor_(0) { stream_socket_.connectNB(address); } BaseSocketConnector(Poco::Net::SocketAddress &address, Poco::Net::SocketReactor &reactor, TcpClientCallbackData &tcp_client_callback) : socket_reactor_(0) { stream_socket_.connectNB(address); RegisterConnector(reactor); tcp_client_callback_ = tcp_client_callback; } virtual ~BaseSocketConnector() { try { UnregisterConnector(); } catch (...) { poco_unexpected(); } } virtual void RegisterConnector(Poco::Net::SocketReactor &reactor) { socket_reactor_ = &reactor; socket_reactor_->addEventHandler(stream_socket_, Poco::Observer(*this, &BaseSocketConnector::OnReadable)); socket_reactor_->addEventHandler(stream_socket_, Poco::Observer(*this, &BaseSocketConnector::OnWritable)); socket_reactor_->addEventHandler(stream_socket_, Poco::Observer(*this, &BaseSocketConnector::OnTimeout)); socket_reactor_->addEventHandler(stream_socket_, Poco::Observer(*this, &BaseSocketConnector::OnError)); } virtual void UnregisterConnector() { if (socket_reactor_) { socket_reactor_->removeEventHandler(stream_socket_, Poco::Observer(*this, &BaseSocketConnector::OnReadable)); socket_reactor_->removeEventHandler(stream_socket_, Poco::Observer(*this, &BaseSocketConnector::OnWritable)); socket_reactor_->removeEventHandler(stream_socket_, Poco::Observer(*this, &BaseSocketConnector::OnTimeout)); socket_reactor_->removeEventHandler(stream_socket_, Poco::Observer(*this, &BaseSocketConnector::OnError)); } } void OnReadable(Poco::Net::ReadableNotification *readable_notification) { readable_notification->release(); int err = stream_socket_.impl()->socketError(); if (err) { OnError(err); UnregisterConnector(); } else { OnConnect(); } } void OnWritable(Poco::Net::WritableNotification *writable_notification) { writable_notification->release(); OnConnect(); } void OnConnect() { stream_socket_.setBlocking(true); NewTcpClientHandle(); UnregisterConnector(); } void OnError(Poco::Net::ErrorNotification *error_notification) { error_notification->release(); OnError(stream_socket_.impl()->socketError()); } void OnTimeout(Poco::Net::TimeoutNotification *timeout_notification) { timeout_notification->release(); OnError(stream_socket_.impl()->socketError()); } // void GetSendLen(int &send_len) { send_len = tcp_client_callback_.send_len; } protected: virtual TcpClientHandle *NewTcpClientHandle() { return new TcpClientHandle(stream_socket_, socket_reactor_, tcp_client_callback_); } virtual void OnError(int errorCode) { socket_reactor_->stop(); stream_socket_.close(); } Poco::Net::SocketReactor *GetReactor() { return socket_reactor_; } Poco::Net::StreamSocket &GetSocket() { return stream_socket_; } private: BaseSocketConnector(); BaseSocketConnector(const BaseSocketConnector &); BaseSocketConnector &operator=(const BaseSocketConnector &); Poco::Net::StreamSocket stream_socket_; Poco::Net::SocketReactor *socket_reactor_; // TcpClientCallbackData tcp_client_callback_;};#endif // _BASE_SOCKET_CONNECTOR_H_※base_tcp_client.h,根据应用,可以进行修改123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131#ifndef _BASE_TCP_CLIENT_H_#define _BASE_TCP_CLIENT_H_#include #include "base_socket_connector.h"#include "Poco/Net/Net.h"#include "Poco/Net/SocketNotification.h"#include "Poco/Net/SocketAddress.h"#include "Poco/Net/StreamSocket.h"#include "Poco/Observer.h"class BaseTcpClient{ public: BaseTcpClient(Poco::Net::StreamSocket &socket, Poco::Net::SocketReactor *reactor, TcpClientCallbackData &tcp_client_callback) : socket_(socket), reactor_(reactor), on_readable_(*this, &BaseTcpClient::OnReadable), on_writable_(*this, &BaseTcpClient::OnWritable), on_timeout_(*this, &BaseTcpClient::OnTimeout), timeout_(false), close_on_timeout_(false), tcp_client_callback_(0) { tcp_client_callback_ = &tcp_client_callback; reactor_->addEventHandler(socket_, on_readable_); reactor_->addEventHandler(socket_, on_writable_); reactor_->addEventHandler(socket_, on_timeout_); } ~BaseTcpClient() { reactor_->removeEventHandler(socket_, Poco::Observer(*this, &BaseTcpClient::OnReadable)); reactor_->removeEventHandler(socket_, Poco::Observer(*this, &BaseTcpClient::OnWritable)); reactor_->removeEventHandler(socket_, Poco::Observer(*this, &BaseTcpClient::OnTimeout)); } void OnReadable(Poco::Net::ReadableNotification *readable_notification) { int recv_len = 0; readable_notification->release(); if (NULL == tcp_client_callback_) { return; } recv_len = socket_.receiveBytes(tcp_client_callback_->recv_buf, tcp_client_callback_->max_recv_len); if (recv_len > 0) { // 使用者的回调函数,由使用者处理接收到的数据 tcp_client_callback_->recv_callback(tcp_client_callback_->recv_args, tcp_client_callback_->recv_buf, recv_len); } else { std::cout << "BaseTcpClient - Read Error." << std::endl; reactor_->stop(); socket_.close(); delete this; } } void OnWritable(Poco::Net::WritableNotification *writable_notification) { writable_notification->release(); if (NULL == tcp_client_callback_) { return; } // 回调获取使用者发送的数据 tcp_client_callback_->send_len = -1; tcp_client_callback_->send_loaddata(tcp_client_callback_->send_args, tcp_client_callback_->send_buf, tcp_client_callback_->max_send_len, tcp_client_callback_->send_len); // 待发送数据长度大于0 if (tcp_client_callback_->send_len >= 0) { // 调用实际发送函数 tcp_client_callback_->send_len = socket_.sendBytes(tcp_client_callback_->send_buf, tcp_client_callback_->send_len); } if (tcp_client_callback_->send_len < 0) { std::cout << "BaseTcpClient - Wirte Error." << std::endl; } } void OnTimeout(Poco::Net::TimeoutNotification *timeout_notification) { timeout_notification->release(); timeout_ = true; if (close_on_timeout_) { reactor_->stop(); socket_.close(); delete this; } } private: Poco::Net::StreamSocket socket_; Poco::Net::SocketReactor *reactor_; Poco::Observer on_readable_; Poco::Observer on_writable_; Poco::Observer on_timeout_; bool timeout_; bool close_on_timeout_; private: // TcpClientCallbackData *tcp_client_callback_;};#endif // _BASE_TCP_CLIENT_H_※main.c:123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "Poco/Net/SocketReactor.h"#include "Poco/Net/SocketNotification.h"#include "Poco/Net/SocketAcceptor.h"#include "Poco/Net/ParallelSocketAcceptor.h"#include "Poco/Net/StreamSocket.h"#include "Poco/Net/ServerSocket.h"#include "Poco/Net/SocketAddress.h"#include "Poco/Observer.h"#include "Poco/Exception.h"#include "Poco/LocalDateTime.h"#include "Poco/DateTime.h"#include "Poco/DateTimeFormat.h"#include "Poco/DateTimeFormatter.h"#include "Poco/DateTimeParser.h"#include "Poco/Mutex.h"#include "Poco/Thread.h"#include "Poco/ThreadTarget.h"#include "base_tcp_client.h"/***************************************************************************** * * Macro definition * *****************************************************************************///#define CCT_CAMERA_SERVER_IP ("127.0.0.1")#define CCT_CAMERA_SERVER_PORT (8080)/***************************************************************************** * * Structure/Class definition * *****************************************************************************/#define MAX_BUF_SIZE 2048#define MAX_ONSEND_INTERVAL 10 /// ms/***************************************************************************** * * Data definition * *****************************************************************************///static pthread_cond_t send_ready = PTHREAD_COND_INITIALIZER;static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER;static uint8_t user_send_buf[2048];static int user_send_len;/***************************************************************************** * * Function prototype * *****************************************************************************////***************************************************************************** * * Function entity * *****************************************************************************///void PrintTimeStamp(char *str) { struct timeval tv; gettimeofday(&tv, NULL); printf("Time Stamp - %s, %ld:%ld\n\r", str, tv.tv_sec, tv.tv_usec);}static int StartThread(void *(*thread_entry)(void *), void *thread_para) { int rtn; pthread_attr_t thread_attr; pthread_t thread_ID; pthread_attr_init(&thread_attr); pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); rtn = pthread_create(&thread_ID, &thread_attr, thread_entry, thread_para); return rtn;}void UserSendFuncLoadData_(void){ pthread_mutex_lock(&send_lock); Poco::LocalDateTime now; std::string time_str = Poco::DateTimeFormatter::format(now, Poco::DateTimeFormat::ISO8601_FRAC_FORMAT); user_send_len = sprintf((char *)user_send_buf, "Hello, %s", time_str.c_str()); user_send_buf[user_send_len] = 0; user_send_len ++; pthread_cond_signal(&send_ready); pthread_mutex_unlock(&send_lock);}void UserSendFuncLoadData(void *send_args, unsigned char *send_data, int max_send_len, int &send_len){ pthread_mutex_lock(&send_lock); pthread_cond_wait(&send_ready, &send_lock); strcpy((char *)send_data, (char *)user_send_buf); send_len = sprintf((char *)send_data, "%s, %s\n\r", (char *)user_send_buf, (char *)send_args); send_data[send_len ++] = 0; //printf("%s - %s\n\r", __func__, send_data); pthread_mutex_unlock(&send_lock);}void UserRecvFuncCallback(void *recv_args, unsigned char *recv_data, int &recv_len){ recv_data[recv_len] = '\0'; std::cout << "UserRecvFuncCallback - " << recv_data << "\n\r";}static void *SendDataThrd(void *para) { while (1) { UserSendFuncLoadData_(); sleep(1); }}static void TcpClientTestThrd(char *str){ unsigned char recv_buf[MAX_BUF_SIZE]; unsigned char send_buf[MAX_BUF_SIZE]; TcpClientCallbackData tcp_client_callback_data; tcp_client_callback_data.recv_args = NULL; tcp_client_callback_data.send_args = (void *)str; tcp_client_callback_data.recv_callback = UserRecvFuncCallback; tcp_client_callback_data.send_loaddata = UserSendFuncLoadData; tcp_client_callback_data.send_buf = send_buf; tcp_client_callback_data.recv_buf = recv_buf; tcp_client_callback_data.max_recv_len = MAX_BUF_SIZE; tcp_client_callback_data.max_send_len = MAX_BUF_SIZE; int server_port = CCT_CAMERA_SERVER_PORT; std::string server_ip(CCT_CAMERA_SERVER_IP); Poco::Net::SocketAddress socket_addr(server_ip.c_str(), server_port); StartThread(SendDataThrd, NULL); do { SocketReactor reactor; BaseSocketConnector connector(socket_addr, reactor, tcp_client_callback_data); //std::cout << "before run\n\r"; reactor.run(); //std::cout << "after run\n\r"; Poco::Thread::sleep(MAX_ONSEND_INTERVAL); } while (1);}int main(int argc, char ** argv){ if (argc < 2) { printf("%s some_info\n\r", argv[0]); return 0; } TcpClientTestThrd(argv[1]); return 0;}※makefile:123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100export TOP_DIR = $(PWD)# 包含参数配置文件#include ./Makefile.param# 目标文件名称、编译后文件位置PWD := $(shell pwd)INSTALLDIR = .TARGET = tcp_client_testDBGTARGET = $(INSTALLDIR)/$(TARGET)dRLSTARGET = $(INSTALLDIR)/$(TARGET)# 配置编译器CC = $(CROSS_COMPILE)gccCXX = $(CROSS_COMPILE)g++LD_CXX = $(CROSS_COMPILE)g++STRIP = $(CROSS_COMPILE)stripRM = rm# 配置依赖头文件、库文件路径INCPATH += -I. -I$(REL_INC) -I/usr/local/include LIBPATH += -L. -L$(REL_LIB) -L/usr/local/lib # 配置编译参数FLAGS_C += $(MACRO_DEFINE) $(INCPATH) $(CFLAGS) -Wall FLAGS_CXX += $(MACRO_DEFINE) $(INCPATH) $(CFLAGS) -Wall LDFLAGS += $(CFLAGS) $(LIBPATH) $(LIBS_CFLAGS) -pie -fPIE LIBS += -lPocoFoundation -lPocoNet -lpthread -lm -lc -lgcc -lgcc_s -ldl DBG_FLAGS = -g -D__DEBUGRLS_FLAGS = -O2 -fno-strict-aliasing# 源文件包含SRCS_C = $(wildcard *.c)SRCS_CXX = $(wildcard *.cpp)SRCS_H = $(wildcard *.h*)# obj文件包含OBJ_DBG_DIR = 0-obj/$(CROSS_COMPILE)dbgOBJ_RLS_DIR = 0-obj/$(CROSS_COMPILE)rlsOBJS_C_DBG = $(addprefix $(OBJ_DBG_DIR)/,$(SRCS_C:%.c=%.o))OBJS_CXX_DBG = $(addprefix $(OBJ_DBG_DIR)/,$(SRCS_CXX:%.cpp=%.o))OBJS_C_RLS = $(addprefix $(OBJ_RLS_DIR)/,$(SRCS_C:%.c=%.o))OBJS_CXX_RLS = $(addprefix $(OBJ_RLS_DIR)/,$(SRCS_CXX:%.cpp=%.o))# 编译动作COMPILE_C = $(CC) $(FLAGS_C) -cCOMPILE_CXX = $(CXX) $(FLAGS_CXX) -cLINK_CXX = $(LD_CXX) $(LDFLAGS) .PHONY: clean debug release installall: releaseinstall: install -d $(EXEC_DIR) install $(RLSTARGET) $(EXEC_DIR) install -m 444 $(TARGET).txt $(EXEC_DIR)release: $(RLSTARGET) $(STRIP) $(RLSTARGET)debug: $(DBGTARGET) @echo $(TARGET)_debug_done$(RLSTARGET): $(OBJS_C_RLS) $(OBJS_CXX_RLS) @mkdir -p $(INSTALLDIR) $(LINK_CXX) -o $@ $^ $(LIBS) $(DBGTARGET): $(OBJS_C_DBG) $(OBJS_CXX_DBG) @mkdir -p $(INSTALLDIR) $(LINK_CXX) -o $@ $^ $(LIBS) $(OBJS_CXX_RLS): $(OBJ_RLS_DIR)/%.o: %.cpp $(SRCS_H) @mkdir -p $(OBJ_RLS_DIR) $(COMPILE_CXX) $(RLS_FLAGS) -o $@ $< $(OBJS_C_RLS): $(OBJ_RLS_DIR)/%.o: %.c $(SRCS_H) @mkdir -p $(OBJ_RLS_DIR) $(COMPILE_C) $(RLS_FLAGS) -o $@ $< $(OBJS_CXX_DBG): $(OBJ_DBG_DIR)/%.o: %.cpp $(SRCS_H) @mkdir -p $(OBJ_DBG_DIR) $(COMPILE_CXX) $(DBG_FLAGS) -o $@ $<$(OBJS_C_DBG): $(OBJ_DBG_DIR)/%.o: %.c $(SRCS_H) @mkdir -p $(OBJ_DBG_DIR) $(COMPILE_C) $(DBG_FLAGS) -o $@ $
https://blog.csdn.net/flyfish1986/article/details/130985212 ↩︎
https://blog.csdn.net/poechant/article/details/7485760 ↩︎
https://leiyiming.com/2017/09/23/reactor/ ↩︎
https://blog.csdn.net/qq_40344790/article/details/129745804 ↩︎
https://www.cnblogs.com/scotth/p/6181583.html ↩︎
https://blog.csdn.net/wuu19/article/details/102976698 ↩︎
https://blog.csdn.net/u010168781/article/details/134562223 ↩︎