4040#include < limits>
4141#include < PipingManager.h>
4242
43+
4344#ifdef WIN32
4445#ifndef NOMINMAX
4546#define NOMINMAX
5657
5758#define DEFAULT_TIMEOUT (10000 )
5859#define DEFAULT_BUFFER (1024 *128 )
59- #define NAMED_PIPE_BUFFER (1024 * 64 )
60- #define WRITE_SIZE (16 *1024 )
61- #define READ_SIZE (16 *1024 )
60+ #define NAMED_PIPE_BUFFER (128 * 1024 )
61+ #define WRITE_SIZE (64 *1024 )
62+ #define READ_SIZE (64 *1024 )
6263
64+ static const auto cvtimeout = std::chrono::milliseconds(50 );
6365
6466using namespace std ::chrono;
6567using std::vector;
@@ -188,6 +190,11 @@ struct PipeData
188190 bool mConnected ;
189191#endif
190192
193+ std::condition_variable mInBufHasData ;
194+ std::condition_variable mInBufHasRoom ;
195+ std::condition_variable mOutBufHasData ;
196+ std::condition_variable mOutBufHasRoom ;
197+
191198 PipeData (std::string name, size_t maxbuf, pipe_type_t type, int id, bool client = false );
192199 ~PipeData ();
193200 void closeThread ();
@@ -212,41 +219,53 @@ int get_flag(pipe_type_t type)
212219}
213220#endif
214221
222+
223+ static bool stopWaitingForIQAvailable (PipeData* p) {
224+ return (p->mInBuf .Taken () > 0 );
225+ }
226+
215227static
216228int pipe_write_func (PipeData* data)
217229{
218230 piping_status_t status = PIPE_MGR_OK;
231+ std::unique_lock<std::mutex> lck (data->mMutex );
232+ data->mInBufHasData .wait_for (lck, cvtimeout, std::bind (stopWaitingForIQAvailable, data));
219233
220- data->mMutex .lock ();
221234 size_t writeSize = data->mInBuf .Taken ();
222235 if (writeSize > WRITE_SIZE) writeSize = WRITE_SIZE;
223236 data->mInBuf .PeekFront (data->mTempBuf .data (), writeSize);
224- data-> mMutex .unlock ();
237+ lck .unlock ();
225238
226239#ifdef WIN32
227240 DWORD bytes_written = 0 ;
228- if (WriteFile (data->mHandle , data->mTempBuf .data (), (DWORD)writeSize, &bytes_written, NULL ) == FALSE )
229- {
230- DWORD last_error = GetLastError ();
231- data->mErrorString = std::to_string (last_error);
232- status = PIPE_MGR_WRITE_ERROR;
241+ if (writeSize > 0 ) {
242+ if (WriteFile (data->mHandle , data->mTempBuf .data (), (DWORD)writeSize, &bytes_written, NULL ) == FALSE )
243+ {
244+ DWORD last_error = GetLastError ();
245+ data->mErrorString = std::to_string (last_error);
246+ status = PIPE_MGR_WRITE_ERROR;
247+ }
233248 }
234249#else
235- int bytes_written = write (data-> mHandle , data-> mTempBuf . data (), writeSize) ;
236- if (bytes_written < 0 )
237- {
238- if (errno != EAGAIN )
250+ int bytes_written = 0 ;
251+ if (writeSize > 0 ) {
252+ bytes_written = write (data-> mHandle , data-> mTempBuf . data (), writeSize);
253+ if (bytes_written < 0 )
239254 {
240- data->mErrorString = " write returned " + std::to_string (bytes_written) + " (errno=" + std::to_string ((int )errno) + " )." ;
241- status = PIPE_MGR_WRITE_ERROR;
255+ if (errno != EAGAIN)
256+ {
257+ data->mErrorString = " write returned " + std::to_string (bytes_written) + " (errno=" + std::to_string ((int )errno) + " )." ;
258+ status = PIPE_MGR_WRITE_ERROR;
259+ }
260+ bytes_written = 0 ;
242261 }
243- bytes_written = 0 ;
244262 }
245263#endif
246264 if (bytes_written > 0 )
247265 {
248266 data->mMutex .lock ();
249267 data->mInBuf .PopFront (bytes_written);
268+ data->mInBufHasRoom .notify_all ();
250269 data->mMutex .unlock ();
251270 data->mTotalDataWritten += bytes_written;
252271 }
@@ -258,11 +277,18 @@ int pipe_write_func(PipeData* data)
258277 return -1 ;
259278}
260279
280+ static bool stopWaitingForOQFree (PipeData* p) {
281+ return (p->mOutBuf .Free () > 0 );
282+ }
283+
261284static
262285int pipe_read_func (PipeData* data)
263286{
264287 piping_status_t status = PIPE_MGR_OK;
288+ std::unique_lock<std::mutex> lck (data->mMutex );
289+ data->mOutBufHasRoom .wait_for (lck, cvtimeout, std::bind (stopWaitingForOQFree, data));
265290 size_t readSize = data->mOutBuf .Free ();
291+ lck.unlock ();
266292 if (readSize > READ_SIZE) readSize = READ_SIZE;
267293 if (readSize == 0 ) return 0 ;
268294
@@ -301,6 +327,7 @@ int pipe_read_func(PipeData* data)
301327 {
302328 data->mMutex .lock ();
303329 data->mOutBuf .Append (data->mTempBuf .data (), bytes_read);
330+ data->mOutBufHasData .notify_all ();
304331 data->mMutex .unlock ();
305332 }
306333
@@ -354,7 +381,6 @@ void pipe_thread_func(PipeData* data)
354381 int written_bytes = 0 ;
355382 int read_bytes = 0 ;
356383 bool success= true ;
357-
358384 // run main thread loop
359385 while (data->mStop == false )
360386 {
@@ -386,9 +412,11 @@ void pipe_thread_func(PipeData* data)
386412 break ;
387413 }
388414
415+ #if 1
389416 if (written_bytes == 0 && read_bytes == 0 ) {
390417 std::this_thread::sleep_for (std::chrono::microseconds (100 ));
391418 }
419+ #endif
392420 }
393421
394422#ifdef WIN32
@@ -688,6 +716,11 @@ piping_status_t PipingManager::getPipePath(int pipe_id, std::string& path)
688716 }
689717}
690718
719+
720+ static bool stopWaitingForIQFree (PipeData* p) {
721+ return (p->mInBuf .Free () > 0 );
722+ }
723+
691724piping_status_t PipingManager::writeToPipe (int pipe_id, void * buffer, size_t data_size, size_t & bytes_written)
692725{
693726 piping_status_t status = PIPE_MGR_OK;
@@ -701,7 +734,9 @@ piping_status_t PipingManager::writeToPipe(int pipe_id, void* buffer, size_t dat
701734 else
702735 {
703736 PipeData* pipe = it->second ;
704- std::lock_guard<std::mutex> lock (pipe->mMutex );
737+
738+ std::unique_lock<std::mutex> lck (pipe->mMutex );
739+ pipe->mInBufHasRoom .wait_for (lck, cvtimeout, std::bind (stopWaitingForIQFree, pipe));
705740
706741 size_t data_to_write = data_size;
707742 if (data_to_write > pipe->mInBuf .Free ())
@@ -717,6 +752,7 @@ piping_status_t PipingManager::writeToPipe(int pipe_id, void* buffer, size_t dat
717752 {
718753 pipe->mInBuf .Append ((char *)buffer, data_to_write);
719754 bytes_written = data_to_write;
755+ pipe->mInBufHasData .notify_all ();
720756 }
721757
722758 status = pipe->mTimeout ? PIPE_MGR_TIMEOUT : status;
@@ -725,6 +761,10 @@ piping_status_t PipingManager::writeToPipe(int pipe_id, void* buffer, size_t dat
725761 return status;
726762}
727763
764+ static bool stopWaitingForOQAvailable (PipeData* p) {
765+ return (p->mOutBuf .Taken () > 0 );
766+ }
767+
728768piping_status_t PipingManager::readFromPipe (int pipe_id, void * buffer, size_t buffer_size, size_t & bytes_read)
729769{
730770 piping_status_t status = PIPE_MGR_OK;
@@ -737,7 +777,9 @@ piping_status_t PipingManager::readFromPipe(int pipe_id, void* buffer, size_t bu
737777 else
738778 {
739779 PipeData* pipe = it->second ;
740- std::lock_guard<std::mutex> lock (pipe->mMutex );
780+
781+ std::unique_lock<std::mutex> lck (pipe->mMutex );
782+ pipe->mOutBufHasData .wait_for (lck, cvtimeout, std::bind (stopWaitingForOQAvailable, pipe));
741783
742784 size_t data_to_read = buffer_size;
743785 if (data_to_read > pipe->mOutBuf .Taken ())
@@ -749,6 +791,7 @@ piping_status_t PipingManager::readFromPipe(int pipe_id, void* buffer, size_t bu
749791 {
750792 pipe->mOutBuf .PopFront ((char *)buffer, data_to_read);
751793 bytes_read = data_to_read;
794+ pipe->mOutBufHasRoom .notify_all ();
752795 }
753796
754797 status = pipe->mTimeout ? PIPE_MGR_TIMEOUT : pipe->mStatus ;
0 commit comments