From 1d09e97b7b233d6952ad3e3ba2fcbbfb9263473d Mon Sep 17 00:00:00 2001 From: Dave Streeter Date: Mon, 23 Mar 2026 11:47:56 +0000 Subject: [PATCH] HPCC-36047 hthor refactor temporary file handling New temporary file handling interface created. hthor changed to use the new interface. Signed-off-by: Dave Streeter --- ecl/eclagent/agentctx.hpp | 21 ++- ecl/eclagent/eclagent.cpp | 84 ++++++++---- ecl/eclagent/eclagent.ipp | 24 ++-- ecl/hthor/hthor.cpp | 282 +++++++++++++++++++------------------- 4 files changed, 227 insertions(+), 184 deletions(-) diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index 73444006563..21dc96f5358 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -80,7 +80,17 @@ interface IOrderedOutputSerializer; typedef enum { ofSTD, ofXML, ofRAW } outputFmts; enum class AccessMode : unsigned; -struct IAgentContext : extends IGlobalCodeContext +interface ITempFileHandler +{ + // name is the name/id of the temporary file, not the filename on disk. + virtual const char *noteTemporaryFile(const char *name) = 0; + // name is the name/id of the temporary file, not the filename on disk. + virtual const char *queryTemporaryFile(const char *name) = 0; + // fname is the temporary filename on disk returned from noteTemporaryFile or queryTemporaryFile. + virtual void removeTemporaryFile(const char *fname) = 0; +}; + +struct IAgentContext : extends IGlobalCodeContext, extends ITempFileHandler { virtual void reportProgress(const char *msg, unsigned flags=0) = 0; virtual bool queryResolveFilesLocally() = 0; @@ -92,12 +102,9 @@ struct IAgentContext : extends IGlobalCodeContext virtual IConstWorkUnit *queryWorkUnit() const = 0; virtual IWorkUnit *updateWorkUnit() const = 0; - + virtual ILocalOrDistributedFile *resolveLFN(const char *logicalName, const char *errorTxt, bool optional, bool noteRead, AccessMode accessMode, StringBuffer * expandedlfn, bool isPrivilegedUser) = 0; virtual StringBuffer & getTempfileBase(StringBuffer & buff) = 0; - virtual const char *noteTemporaryFile(const char *fname) = 0; - virtual const char *noteTemporaryFilespec(const char *fname) = 0; - virtual const char *queryTemporaryFile(const char *fname) = 0; virtual void reloadWorkUnit() = 0; virtual char *resolveName(const char *in, char *out, unsigned outlen) = 0; @@ -110,14 +117,14 @@ struct IAgentContext : extends IGlobalCodeContext virtual void outputFormattedResult(const char *name, unsigned sequence, bool close) = 0; virtual const char *queryAllowedPipePrograms() = 0; - + virtual IOrderedOutputSerializer * queryOutputSerializer() = 0; virtual IGroup *getHThorGroup(StringBuffer &grpnameout) = 0; virtual RecordTranslationMode getLayoutTranslationMode() const = 0; virtual unsigned __int64 queryStopAfter() = 0; - + virtual const char *queryWuid() = 0; virtual void updateWULogfile(IWorkUnit *outputWU) = 0; diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index dc3a1f25c8a..a480d2db0e4 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -610,51 +610,85 @@ const char *EclAgent::queryTempfilePath() return agentTempDir.str(); } -StringBuffer & EclAgent::getTempfileBase(StringBuffer & buff) +StringBuffer &EclAgent::getTempfileBase(StringBuffer &buff) { return buff.append(queryTempfilePath()).append(PATHSEPCHAR).appendLower(wuid); } -const char *EclAgent::queryTemporaryFile(const char *fname) +void EclAgent::buildTempFilename(StringBuffer &tempFilename, const char *name) { - StringBuffer tempfilename; - getTempfileBase(tempfilename).append(PATHSEPCHAR).append(fname); - CriticalBlock crit(tfsect); - ForEachItemIn(idx, tempFiles) + getTempfileBase(tempFilename).append(PATHSEPCHAR).append(name); +} + +const char *EclAgent::queryTemporaryFile(const char *name) +{ + dbgassertex(!isEmptyString(name)); + + StringBuffer tempFilename; + buildTempFilename(tempFilename, name); + { - if (strcmp(tempFiles.item(idx), tempfilename.str())==0) - return tempFiles.item(idx); + CriticalBlock crit(tfsect); + + auto it = tempFileSet.find(tempFilename.str()); + if (it != tempFileSet.end()) + return it->c_str(); } - StringBuffer errmsg; - errmsg.append("Attempt to read temp file that has not yet been registered: ").append(tempfilename); + + VStringBuffer errmsg("Attempt to read temp file that has not yet been registered: %s", name); fail(0, errmsg.str()); - return 0; + return nullptr; } -const char *EclAgent::noteTemporaryFile(const char *fname) +const char *EclAgent::noteTemporaryFile(const char *name) { - StringBuffer tempfilename; - getTempfileBase(tempfilename).append(PATHSEPCHAR).append(fname); - CriticalBlock crit(tfsect); - tempFiles.append(tempfilename.str()); - return tempFiles.item(tempFiles.length()-1); + dbgassertex(!isEmptyString(name)); + + StringBuffer tempFilename; + buildTempFilename(tempFilename, name); + + std::pair::iterator, bool> inserted; + { + CriticalBlock crit(tfsect); + + inserted = tempFileSet.emplace(tempFilename.str()); + } + + // The returned pointer refers to the std::string stored in tempFileSet and + // is only valid while that entry remains in the set. + + if (!inserted.second) + { + VStringBuffer errmsg("Temp file already registered: %s", name); + fail(0, errmsg.str()); + return nullptr; + } + + return inserted.first->c_str(); } -const char *EclAgent::noteTemporaryFilespec(const char *fspec) +void EclAgent::removeTemporaryFile(const char *fname) { + dbgassertex(!isEmptyString(fname)); + CriticalBlock crit(tfsect); - tempFiles.append(fspec); - return tempFiles.item(tempFiles.length()-1); + + auto it = tempFileSet.find(std::string(fname)); + dbgassertex(it != tempFileSet.end()); + if (it != tempFileSet.end()) + { + remove(it->c_str()); + tempFileSet.erase(it); + } } void EclAgent::deleteTempFiles() { CriticalBlock crit(tfsect); - ForEachItemIn(idx, tempFiles) - { - remove(tempFiles.item(idx)); - } - tempFiles.kill(); + + for (const auto& f : tempFileSet) + remove(f.c_str()); + tempFileSet.clear(); } const char *EclAgent::loadResource(unsigned id) diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index d76aec836ab..5a80c02d050 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -35,6 +35,8 @@ #include "thorcommon.hpp" #include "enginecontext.hpp" +#include + #define MAX_EDGEDATA_LENGTH 30000 #define MAX_HEX_SIZE 500 @@ -172,17 +174,17 @@ public: { return ctx->getTempfileBase(buff); } - virtual const char *noteTemporaryFile(const char *fname) + virtual const char *noteTemporaryFile(const char *name) override { - return ctx->noteTemporaryFile(fname); + return ctx->noteTemporaryFile(name); } - virtual const char *noteTemporaryFilespec(const char *fspec) + virtual const char *queryTemporaryFile(const char *name) override { - return ctx->noteTemporaryFilespec(fspec); + return ctx->queryTemporaryFile(name); } - virtual const char *queryTemporaryFile(const char *fname) + virtual void removeTemporaryFile(const char *fname) override { - return ctx->queryTemporaryFile(fname); + ctx->removeTemporaryFile(fname); } virtual void reloadWorkUnit() { @@ -350,7 +352,6 @@ public: virtual bool onDebuggerTimeout(); }; - class CHThorDebugContext; class EclAgent : implements IAgentContext, implements ICodeContext, implements IRowAllocatorMetaActIdCacheCallback, implements IEngineContext, public CInterface { @@ -378,7 +379,7 @@ private: outputFmts outputFmt = ofSTD; unsigned __int64 stopAfter; mutable CriticalSection wusect; - StringArray tempFiles; + std::set tempFileSet; // Set of actual temp file names on disk CriticalSection tfsect; IArray persistReadLocks; StringArray processedPersists; @@ -404,6 +405,7 @@ private: private: void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val); + void buildTempFilename(StringBuffer & tempFilename, const char *filename); IEclProcess *loadProcess(); StringBuffer & getTempfileBase(StringBuffer & buff); const char *queryTempfilePath(); @@ -615,9 +617,9 @@ public: virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash); virtual void reportProgress(const char *msg, unsigned flags=0); - virtual const char *noteTemporaryFile(const char *fname); - virtual const char *noteTemporaryFilespec(const char *fspec); - virtual const char *queryTemporaryFile(const char *fname); + virtual const char *noteTemporaryFile(const char *name) override; + virtual const char *queryTemporaryFile(const char *name) override; + virtual void removeTemporaryFile(const char *fname) override; virtual void deleteFile(const char * logicalName); void addException(ErrorSeverity severity, const char * source, unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool failOnError, bool isAbort); diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index a306d8f90f5..7765e60ecec 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -219,8 +219,8 @@ void CHThorActivityBase::ready() { if (input) input->ready(); - if (needsAllocator()) - createRowAllocator(); + if (needsAllocator()) + createRowAllocator(); initialProcessed = processed; } @@ -230,7 +230,7 @@ CHThorActivityBase::~CHThorActivityBase() } void CHThorActivityBase::createRowAllocator() { - if (!rowAllocator) + if (!rowAllocator) rowAllocator = agent.queryCodeContext()->getRowAllocator(outputMeta.queryOriginal(), activityId); } @@ -328,7 +328,7 @@ stat_type CHThorActivityBase::queryLocalCycles() const __int64 ret = activityStats.totalCycles; if (input) ret -= input->queryTotalCycles(); - if (ret < 0) + if (ret < 0) ret = 0; return ret; } @@ -344,14 +344,14 @@ CHThorSimpleActivityBase::CHThorSimpleActivityBase(IAgentContext &_agent, unsign { } -IHThorInput * CHThorSimpleActivityBase::queryOutput(unsigned index) -{ +IHThorInput * CHThorSimpleActivityBase::queryOutput(unsigned index) +{ assertex(index == 0); - return this; + return this; } bool CHThorSimpleActivityBase::isGrouped() -{ +{ return input ? input->isGrouped() : outputMeta.isGrouped(); } @@ -366,7 +366,7 @@ class CHThorClusterWriteHandler : public ClusterWriteHandler { IAgentContext &agent; public: - CHThorClusterWriteHandler(char const * _logicalName, char const * _activityType, IAgentContext &_agent) + CHThorClusterWriteHandler(char const * _logicalName, char const * _activityType, IAgentContext &_agent) : ClusterWriteHandler(_logicalName, _activityType), agent(_agent) { } @@ -475,9 +475,9 @@ CHThorDiskWriteActivity::~CHThorDiskWriteActivity() } } -void CHThorDiskWriteActivity::ready() -{ - CHThorActivityBase::ready(); +void CHThorDiskWriteActivity::ready() +{ + CHThorActivityBase::ready(); resolve(); uncompressedBytesWritten = 0; numRecords = 0; @@ -491,7 +491,7 @@ void CHThorDiskWriteActivity::execute() ActivityTimer t(activityStats, timeActivities); // Loop thru the results numRecords = 0; - while (next()) + while (next()) numRecords++; finishOutput(); } @@ -587,7 +587,7 @@ void CHThorDiskWriteActivity::resolve() Owned file = createIFile(filename); if (file->exists()) { - if (!overwrite) + if (!overwrite) throw MakeStringException(99, "Cannot write %s, file already exists (missing OVERWRITE attribute?)", full.str()); file->remove(); } @@ -679,7 +679,7 @@ void CHThorDiskWriteActivity::open() const void * CHThorDiskWriteActivity::getNext() { // through operation (writes and returns row) // needs a one row lookahead to preserve group - if (!nextrow.get()) + if (!nextrow.get()) { nextrow.setown(input->nextRow()); if (!nextrow.get()) @@ -700,11 +700,11 @@ bool CHThorDiskWriteActivity::next() if (!nextrow.get()) { OwnedConstRoxieRow row(input->nextRow()); - if (!row.get()) + if (!row.get()) { row.setown(input->nextRow()); if (!row.get()) - return false; // we are done + return false; // we are done if (grouped) outSeq->putRow(NULL); } @@ -972,12 +972,12 @@ void CHThorSpillActivity::stop() for (;;) { OwnedConstRoxieRow nextrec(nextRow()); - if (!nextrec) + if (!nextrec) { nextrec.setown(nextRow()); if (!nextrec) break; - } + } } finishOutput(); CHThorDiskWriteActivity::stop(); @@ -1197,7 +1197,7 @@ CHThorIndexWriteActivity::CHThorIndexWriteActivity(IAgentContext &_agent, unsign if (f) { - if (TIWoverwrite & helper.getFlags()) + if (TIWoverwrite & helper.getFlags()) { LOG(MCuserInfo, "Removing %s from DFS", lfn.str()); agent.logFileAccess(f, "HThor", "DELETED", _graph); @@ -1524,7 +1524,7 @@ void CHThorIndexWriteActivity::execute() Owned result = wu->updateResultBySequence(helper.getSequence()); if (result) { - result->setResultTotalRowCount(reccount); + result->setResultTotalRowCount(reccount); result->setResultStatus(ResultStatusCalculated); result->setResultLogicalName(lfn.str()); } @@ -2087,7 +2087,7 @@ const void *CHThorIterateActivity::nextRow() unsigned outSize = helper.transform(rowBuilder, left ? left : defaultRecord, right, ++counter); if (outSize) { - left.setown(rowBuilder.finalizeRowClear(outSize)); + left.setown(rowBuilder.finalizeRowClear(outSize)); processed++; return left.getLink(); } @@ -2338,7 +2338,7 @@ void CHThorNormalizeLinkedChildActivity::stop() { curParent.clear(); curChild.clear(); - CHThorSimpleActivityBase::stop(); + CHThorSimpleActivityBase::stop(); } const void * CHThorNormalizeLinkedChildActivity::nextRow() @@ -2525,7 +2525,7 @@ const void * CHThorFilterProjectActivity::nextRow() if (outSize) { processed++; - return rowBuilder.finalizeRowClear(outSize); + return rowBuilder.finalizeRowClear(outSize); } } catch(IException * e) @@ -3129,20 +3129,20 @@ const void * CHThorFilterActivity::nextRowGE(const void * seek, unsigned numFiel return ungroupedNextRow(); } -bool CHThorFilterActivity::gatherConjunctions(ISteppedConjunctionCollector & collector) -{ - return input->gatherConjunctions(collector); +bool CHThorFilterActivity::gatherConjunctions(ISteppedConjunctionCollector & collector) +{ + return input->gatherConjunctions(collector); } -void CHThorFilterActivity::resetEOF() -{ +void CHThorFilterActivity::resetEOF() +{ //Sometimes the smart stepping code returns a premature eof indicator (two nulls) and will //therefore call resetEOF so the activity can reset its eof without resetting the activity itself. //Note that resetEOF only needs to be implemented by activities that implement gatherConjunctions() //and that cache eof. eof = false; anyThisGroup = false; - input->resetEOF(); + input->resetEOF(); } //===================================================================================================== @@ -3271,7 +3271,7 @@ const void * CHThorLimitActivity::nextRow() } processed++; } - + return ret.getClear(); } @@ -3290,7 +3290,7 @@ const void * CHThorLimitActivity::nextRowGE(const void * seek, unsigned numField } processed++; } - + return ret.getClear(); } @@ -3439,9 +3439,9 @@ CHThorOnFailLimitActivity::CHThorOnFailLimitActivity(IAgentContext &_agent, unsi { } -void CHThorOnFailLimitActivity::onLimitExceeded() -{ - buffer->clear(); +void CHThorOnFailLimitActivity::onLimitExceeded() +{ + buffer->clear(); RtlDynamicRowBuilder rowBuilder(rowAllocator); size32_t newSize = helper.transformOnLimitExceeded(rowBuilder); @@ -3618,15 +3618,15 @@ const void * CHThorAggregateActivity::nextRow() eof = true; return NULL; } - + RtlDynamicRowBuilder rowBuilder(rowAllocator); helper.clearAggregate(rowBuilder); - + if (next) { helper.processFirst(rowBuilder, next); ReleaseRoxieRow(next); - + bool abortEarly = (kind == TAKexistsaggregate) && !input->isGrouped(); if (!abortEarly) { @@ -3641,10 +3641,10 @@ const void * CHThorAggregateActivity::nextRow() } } } - + if (!input->isGrouped()) // either read all, or aborted early eof = true; - + processed++; size32_t finalSize = outputMeta.getRecordSize(rowBuilder.getSelf()); return rowBuilder.finalizeRowClear(finalSize); @@ -3734,7 +3734,7 @@ CHThorSelectNActivity::CHThorSelectNActivity(IAgentContext &_agent, unsigned _ac const void * CHThorSelectNActivity::defaultRow() { - if (!rowAllocator) + if (!rowAllocator) createRowAllocator(); //We delay as often not needed... RtlDynamicRowBuilder rowBuilder(rowAllocator); size32_t thisSize = helper.createDefault(rowBuilder); @@ -3972,13 +3972,13 @@ const void * CHThorChooseSetsExActivity::nextRow() //===================================================================================================== CHThorChooseSetsLastActivity::CHThorChooseSetsLastActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChooseSetsExArg &_arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorChooseSetsExActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph) -{ +{ numToSkip = (unsigned *)checked_calloc(sizeof(unsigned), numSets, "choose sets last"); } -CHThorChooseSetsLastActivity::~CHThorChooseSetsLastActivity() -{ - free(numToSkip); +CHThorChooseSetsLastActivity::~CHThorChooseSetsLastActivity() +{ + free(numToSkip); } void CHThorChooseSetsLastActivity::ready() @@ -4008,20 +4008,20 @@ bool CHThorChooseSetsLastActivity::includeRow(const void * row) return true; numToSkip[category-1]--; } - return false; + return false; } //===================================================================================================== CHThorChooseSetsEnthActivity::CHThorChooseSetsEnthActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChooseSetsExArg &_arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorChooseSetsExActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph) -{ +{ counter = (unsigned __int64 *)checked_calloc(sizeof(unsigned __int64), numSets, "choose sets enth"); } -CHThorChooseSetsEnthActivity::~CHThorChooseSetsEnthActivity() -{ - free(counter); +CHThorChooseSetsEnthActivity::~CHThorChooseSetsEnthActivity() +{ + free(counter); } void CHThorChooseSetsEnthActivity::ready() @@ -4045,9 +4045,9 @@ bool CHThorChooseSetsEnthActivity::includeRow(const void * row) { counter[category-1] -= setCounts[category-1]; return true; - } + } } - return false; + return false; } @@ -4640,7 +4640,7 @@ const void *CHThorGroupedActivity::nextRow() OwnedConstRoxieRow ret(next[nextRowIndex].getClear()); if (ret) { - if (next[nextToCompare]) + if (next[nextToCompare]) { if (!helper.isSameGroup(ret, next[nextToCompare])) throw MakeStringException(100, "GROUPED(%u), expected a group break between adjacent rows (rows %" I64F "d, %" I64F "d) ", activityId, processed+1, processed+2); @@ -4833,9 +4833,9 @@ void CHThorJoinActivity::ready() exclude = (helper.getJoinFlags() & JFexclude) != 0; getLimitType(helper.getJoinFlags(), limitFail, limitOnFail); if (rightOuterJoin && !defaultLeft) - createDefaultLeft(); + createDefaultLeft(); if ((leftOuterJoin || limitOnFail) && !defaultRight) - createDefaultRight(); + createDefaultRight(); betweenjoin = ((helper.getJoinFlags() & JFslidingmatch) != 0); assertex(!(betweenjoin && rightOuterJoin)); @@ -4848,7 +4848,7 @@ void CHThorJoinActivity::ready() else assertex(!rightOuterJoin && !betweenjoin); abortLimit = helper.getMatchAbortLimit(); - if (abortLimit == 0) + if (abortLimit == 0) abortLimit = (unsigned)-1; assertex((helper.getJoinFlags() & (JFfirst | JFfirstleft | JFfirstright)) == 0); // no longer supported @@ -4867,7 +4867,7 @@ void CHThorJoinActivity::ready() joinCounter = 0; failingLimit.clear(); state = JSfill; - if ((helper.getJoinFlags() & JFlimitedprefixjoin) && helper.getJoinLimit()) + if ((helper.getJoinFlags() & JFlimitedprefixjoin) && helper.getJoinLimit()) { //Limited Match Join (s[1..n]) limitedhelper.setown(createRHLimitedCompareHelper()); limitedhelper->init( helper.getJoinLimit(), groupedSortedRightInput, collate, helper.queryPrefixCompare() ); @@ -4973,7 +4973,7 @@ void CHThorJoinActivity::fillRight() } if(!rightOuterJoin && next && (!left || (collateupper->docompare(left, next) > 0))) // if right is less than left, and not right outer, can skip group { - while(next) + while(next) next.setown(groupedSortedRightInput->nextRow()); continue; } @@ -5009,7 +5009,7 @@ void CHThorJoinActivity::fillRight() { right.clear(); groupCount = 0; - while(next) + while(next) { next.setown(groupedSortedRightInput->nextRow()); } @@ -5020,7 +5020,7 @@ void CHThorJoinActivity::fillRight() groupCount++; } next.setown(groupedSortedRightInput->nextRow()); - + } // normally only want to read one right group, but if is between join and next right group is in window for left, need to continue if(betweenjoin && left) @@ -5274,7 +5274,7 @@ const void *CHThorJoinActivity::nextRow() } state = JSfillright; break; - + case JSleftonly: { const void * ret = NULL; @@ -5429,7 +5429,7 @@ stat_type CHThorJoinActivity::queryLocalCycles() const __int64 ret = CHThorActivityBase::queryLocalCycles(); if (input1) ret -= input1->queryTotalCycles(); - if (ret < 0) + if (ret < 0) ret = 0; return ret; } @@ -5463,7 +5463,7 @@ void CHThorSelfJoinActivity::ready() getLimitType(helper.getJoinFlags(), limitFail, limitOnFail); if (rightOuterJoin && !defaultLeft) { - if (!defaultAllocator) + if (!defaultAllocator) defaultAllocator.setown(agent.queryCodeContext()->getRowAllocator(input->queryOutputMeta(), activityId)); RtlDynamicRowBuilder rowBuilder(defaultAllocator); @@ -5472,7 +5472,7 @@ void CHThorSelfJoinActivity::ready() } if ((leftOuterJoin || limitOnFail) && !defaultRight) { - if (!defaultAllocator) + if (!defaultAllocator) defaultAllocator.setown(agent.queryCodeContext()->getRowAllocator(input->queryOutputMeta(), activityId)); RtlDynamicRowBuilder rowBuilder(defaultAllocator); @@ -5491,7 +5491,7 @@ void CHThorSelfJoinActivity::ready() else assertex(!rightOuterJoin); abortLimit = helper.getMatchAbortLimit(); - if (abortLimit == 0) + if (abortLimit == 0) abortLimit = (unsigned)-1; assertex((helper.getJoinFlags() & (JFfirst | JFfirstleft | JFfirstright)) == 0); // no longer supported @@ -5501,7 +5501,7 @@ void CHThorSelfJoinActivity::ready() eof = false; doneFirstFill = false; failingLimit.clear(); - if ((helper.getJoinFlags() & JFlimitedprefixjoin) && helper.getJoinLimit()) + if ((helper.getJoinFlags() & JFlimitedprefixjoin) && helper.getJoinLimit()) { //Limited Match Join (s[1..n]) dualcache.setown(new CRHDualCache()); dualcache->init(groupedInput); @@ -5560,7 +5560,7 @@ bool CHThorSelfJoinActivity::fillGroup() } group.clear(); groupCount = 0; - while(next) + while(next) next.setown(groupedInput->nextRow()); } else if(groupCount==atmostLimit) @@ -5576,7 +5576,7 @@ bool CHThorSelfJoinActivity::fillGroup() { group.clear(); groupCount = 0; - while(next) + while(next) next.setown(groupedInput->nextRow()); } } @@ -5618,7 +5618,7 @@ const void * CHThorSelfJoinActivity::nextRow() group.clear(); limitedhelper->getGroup(group,lhs); } - else + else eof = true; } @@ -5747,7 +5747,7 @@ const void * CHThorSelfJoinActivity::joinRecords(const void * curLeft, const voi { size32_t thisSize = (except ? helper.onFailTransform(outBuilder, curLeft, curRight, except, flags) : helper.transform(outBuilder, curLeft, curRight, counter, flags)); if(thisSize){ - return outBuilder.finalizeRowClear(thisSize); + return outBuilder.finalizeRowClear(thisSize); } else return NULL; @@ -5882,7 +5882,7 @@ void CHThorLookupJoinActivity::ready() getLimitType(helper.getJoinFlags(), limitFail, limitOnFail); if((leftOuterJoin || limitOnFail) && !defaultRight) - createDefaultRight(); + createDefaultRight(); eog = false; matchedGroup = false; joinCounter = 0; @@ -6252,7 +6252,7 @@ stat_type CHThorLookupJoinActivity::queryLocalCycles() const __int64 ret = CHThorActivityBase::queryLocalCycles(); if (input1) ret -= input1->queryTotalCycles(); - if (ret < 0) + if (ret < 0) ret = 0; return ret; } @@ -6274,7 +6274,7 @@ void CHThorAllJoinActivity::ready() leftOuterJoin = (helper.getJoinFlags() & JFleftouter) != 0; exclude = (helper.getJoinFlags() & JFexclude) != 0; if(leftOuterJoin && !defaultRight) - createDefaultRight(); + createDefaultRight(); if((helper.getJoinFlags() & (JFrightouter | JFfirst | JFfirstleft | JFfirstright)) != 0) throwUnexpected(); @@ -6397,7 +6397,7 @@ const void * CHThorAllJoinActivity::nextRow() loadRight(); } - const void * ret; + const void * ret; const void * right; if(eos) return NULL; @@ -6567,7 +6567,7 @@ stat_type CHThorAllJoinActivity::queryLocalCycles() const __int64 ret = CHThorActivityBase::queryLocalCycles(); if (input1) ret -= input1->queryTotalCycles(); - if (ret < 0) + if (ret < 0) ret = 0; return ret; } @@ -6581,7 +6581,7 @@ CHThorWorkUnitWriteActivity::CHThorWorkUnitWriteActivity(IAgentContext &_agent, static void throwWuResultTooLarge(size32_t outputLimit, IHThorWorkUnitWriteArg &helper) { - StringBuffer errMsg("Dataset too large to output to workunit (limit "); + StringBuffer errMsg("Dataset too large to output to workunit (limit "); errMsg.append(outputLimit/0x100000).append(" megabytes), in result ("); const char *name = helper.queryName(); if (name) @@ -6960,7 +6960,7 @@ void CHThorMultiInputActivity::updateProgress(IStatisticGatherer &progress) cons if (i) i->updateProgress(progress); } -} +} stat_type CHThorMultiInputActivity::queryLocalCycles() const { @@ -7371,7 +7371,7 @@ void CHThorDistributionActivity::execute() { MemoryAttr ma; IDistributionTable * * accumulator = (IDistributionTable * *)ma.allocate(helper.queryInternalRecordSize()->getMinRecordSize()); - helper.clearAggregate(accumulator); + helper.clearAggregate(accumulator); OwnedConstRoxieRow nextrec(input->nextRow()); for (;;) @@ -7474,8 +7474,8 @@ const void *CHThorWorkunitReadActivity::nextRow() processed = diskread->queryProcessed(); return ret; } - if (deserializer.eos()) - return NULL; + if (deserializer.eos()) + return NULL; if (eogPending) { @@ -7484,12 +7484,12 @@ const void *CHThorWorkunitReadActivity::nextRow() } RtlDynamicRowBuilder rowBuilder(rowAllocator); size32_t newSize = rowDeserializer->deserialize(rowBuilder, deserializer); - + if (grouped) deserializer.read(sizeof(bool), &eogPending); processed++; - return rowBuilder.finalizeRowClear(newSize); + return rowBuilder.finalizeRowClear(newSize); } //===================================================================================================== @@ -7882,10 +7882,10 @@ class CHThorMergeActivity : public CHThorMultiInputActivity merger.initInputs(inputs.length(), inputs.getArray()); } - virtual void stop() + virtual void stop() { merger.done(); - CHThorMultiInputActivity::stop(); + CHThorMultiInputActivity::stop(); } virtual const void * nextRow() @@ -8336,7 +8336,7 @@ void CHThorChildAggregateActivity::ready() CHThorChildGroupAggregateActivity::CHThorChildGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorChildGroupAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph) : CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), - helper(_arg), + helper(_arg), aggregated(_arg, _arg) { } @@ -8360,7 +8360,7 @@ void CHThorChildGroupAggregateActivity::processRow(const void * next) { aggregated.addRow(next); } - + const void * CHThorChildGroupAggregateActivity::nextRow() { @@ -8434,9 +8434,9 @@ const void *CHThorChildThroughNormalizeActivity::nextRow() ok = helper.first(lastInput); } - + outBuilder.ensureRow(); - do + do { size32_t thisSize = helper.transform(outBuilder); if (thisSize) @@ -8477,9 +8477,9 @@ CHThorDiskReadBaseActivity::~CHThorDiskReadBaseActivity() close(); } -void CHThorDiskReadBaseActivity::ready() -{ - CHThorActivityBase::ready(); +void CHThorDiskReadBaseActivity::ready() +{ + CHThorActivityBase::ready(); grouped = false; fixedDiskRecordSize = 0; @@ -8575,7 +8575,7 @@ void CHThorDiskReadBaseActivity::checkFileType(IDistributedFile *file) if (isEmptyString(kind)) // file has no published kind, can't validate return; if (!strieq(kind, expectedType)) - { + { Owned e = makeStringExceptionV(ENGINEERR_FILE_TYPE_MISMATCH, "File format mismatch reading file: '%s'. Expected type '%s', but file is type '%s'", file->queryLogicalName(), expectedType, kind); if (!warningOnly) throw e.getClear(); @@ -8694,7 +8694,7 @@ void CHThorDiskReadBaseActivity::gatherInfo(IFileDescriptor * fileDesc) helper.getEncryptKey(kl,k); encryptionkey.setOwn(kl,k); - if (encryptionkey.length()!=0) + if (encryptionkey.length()!=0) { blockcompressed = true; compressed = true; @@ -8994,7 +8994,7 @@ bool CHThorDiskReadBaseActivity::openNext() if(compressed) { Owned eexp; - if (encryptionkey.length()) + if (encryptionkey.length()) eexp.setown(createAESExpander256((size32_t) encryptionkey.length(),encryptionkey.bufferBase())); inputfileio.setown(createCompressedFileReader(inputfile, eexp, useDefaultIoBufferSize, false, IFEnone)); if(!inputfileio && !blockcompressed) //fall back to old decompression, unless dfs marked as new @@ -9032,7 +9032,7 @@ bool CHThorDiskReadBaseActivity::openNext() bool CHThorDiskReadBaseActivity::checkOpenedFile(char const * filename, char const * filenamelist) { unsigned __int64 filesize = 0; - if (!inputfileio) + if (!inputfileio) { if (!(helper.getFlags() & TDRoptional)) { @@ -9040,9 +9040,9 @@ bool CHThorDiskReadBaseActivity::checkOpenedFile(char const * filename, char con if(filenamelist) { if (saveOpenExc.get()) { - if (strstr(mangledHelperFileName.str(),"::>")!=NULL) // if a 'special' filename just use saved exception + if (strstr(mangledHelperFileName.str(),"::>")!=NULL) // if a 'special' filename just use saved exception saveOpenExc->errorMessage(s); - else + else { s.append("Could not open logical file ").append(mangledHelperFileName.str()).append(" in any of these locations:").append(filenamelist).append(" ("); saveOpenExc->errorMessage(s).append(")"); @@ -9123,9 +9123,9 @@ void CHThorBinaryDiskReadBase::append(FFoption option, const IFieldFilter * filt fieldFilters.append(*filter); } -void CHThorBinaryDiskReadBase::ready() -{ - CHThorDiskReadBaseActivity::ready(); +void CHThorBinaryDiskReadBase::ready() +{ + CHThorDiskReadBaseActivity::ready(); fieldFilters.kill(); segHelper.createSegmentMonitors(this); } @@ -9179,9 +9179,9 @@ CHThorDiskReadActivity::CHThorDiskReadActivity(IAgentContext &_agent, unsigned _ lastGroupProcessed = 0; } -void CHThorDiskReadActivity::ready() -{ - PARENT::ready(); +void CHThorDiskReadActivity::ready() +{ + PARENT::ready(); outBuilder.setAllocator(rowAllocator); eogPending = false; lastGroupProcessed = processed; @@ -9196,9 +9196,9 @@ void CHThorDiskReadActivity::ready() void CHThorDiskReadActivity::stop() -{ +{ outBuilder.clear(); - PARENT::stop(); + PARENT::stop(); } @@ -9273,7 +9273,7 @@ const void *CHThorDiskReadActivity::nextRow() } else { - while(!eofseen && ((stopAfter == 0) || (processed - initialProcessed) < stopAfter)) + while(!eofseen && ((stopAfter == 0) || (processed - initialProcessed) < stopAfter)) { queryUpdateProgress(); @@ -9314,15 +9314,15 @@ CHThorDiskNormalizeActivity::CHThorDiskNormalizeActivity(IAgentContext &_agent, { } -void CHThorDiskNormalizeActivity::stop() -{ +void CHThorDiskNormalizeActivity::stop() +{ outBuilder.clear(); - PARENT::stop(); + PARENT::stop(); } -void CHThorDiskNormalizeActivity::ready() -{ - PARENT::ready(); +void CHThorDiskNormalizeActivity::ready() +{ + PARENT::ready(); outBuilder.setAllocator(rowAllocator); limit = helper.getRowLimit(); if (helper.getFlags() & TDRlimitskips) @@ -9344,7 +9344,7 @@ const void *CHThorDiskNormalizeActivity::nextRow() if (!opened) open(); for (;;) { - if (eofseen || (stopAfter && (processed - initialProcessed) >= stopAfter)) + if (eofseen || (stopAfter && (processed - initialProcessed) >= stopAfter)) break; for (;;) @@ -9439,15 +9439,15 @@ CHThorDiskAggregateActivity::CHThorDiskAggregateActivity(IAgentContext &_agent, { } -void CHThorDiskAggregateActivity::stop() -{ +void CHThorDiskAggregateActivity::stop() +{ outBuilder.clear(); - PARENT::stop(); + PARENT::stop(); } -void CHThorDiskAggregateActivity::ready() -{ - PARENT::ready(); +void CHThorDiskAggregateActivity::ready() +{ + PARENT::ready(); outBuilder.setAllocator(rowAllocator); finished = false; } @@ -9517,9 +9517,9 @@ CHThorDiskCountActivity::~CHThorDiskCountActivity() { } -void CHThorDiskCountActivity::ready() -{ - PARENT::ready(); +void CHThorDiskCountActivity::ready() +{ + PARENT::ready(); finished = false; stopAfter = helper.getChooseNLimit(); if (!helper.hasFilter()) @@ -9567,7 +9567,7 @@ const void *CHThorDiskCountActivity::nextRow() for (;;) { - if (eofseen) + if (eofseen) break; while (!prefetchBuffer.eos()) { @@ -9614,14 +9614,14 @@ const void *CHThorDiskCountActivity::nextRow() CHThorDiskGroupAggregateActivity::CHThorDiskGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskGroupAggregateArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) : CHThorBinaryDiskReadBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _node, _graph), - helper(_arg), + helper(_arg), aggregated(_arg, _arg) { } -void CHThorDiskGroupAggregateActivity::ready() -{ - PARENT::ready(); +void CHThorDiskGroupAggregateActivity::ready() +{ + PARENT::ready(); eof = false; gathered = false; } @@ -9806,7 +9806,7 @@ bool CHThorCsvReadActivity::openNext() inputstream->skip(thisLineLength); } // only skip header in the first file - since spray doesn't duplicate the header. - headerLines = 0; + headerLines = 0; return true; } return false; @@ -10053,7 +10053,7 @@ CHThorLocalResultSpillActivity::CHThorLocalResultSpillActivity(IAgentContext &_a void CHThorLocalResultSpillActivity::ready() { - CHThorSimpleActivityBase::ready(); + CHThorSimpleActivityBase::ready(); result = graph->createResult(helper.querySequence(), LINK(rowAllocator)); nullPending = false; } @@ -10101,7 +10101,7 @@ void CHThorLocalResultSpillActivity::stop() result->addRowOwn(ret); } } - CHThorSimpleActivityBase::stop(); + CHThorSimpleActivityBase::stop(); } @@ -10125,7 +10125,7 @@ void CHThorLoopActivity::ready() curInput = &input->queryStream(); eof = false; loopCounter = 1; - CHThorSimpleActivityBase::ready(); + CHThorSimpleActivityBase::ready(); maxIterations = helper.numIterations(); if ((int)maxIterations < 0) maxIterations = 0; finishedLooping = ((kind == TAKloopcount) && (maxIterations == 0)); @@ -10162,7 +10162,7 @@ const void * CHThorLoopActivity::nextRow() } } - if (finishedLooping || + if (finishedLooping || ((flags & IHThorLoopArg::LFfiltered) && !helper.sendToLoop(loopCounter, ret))) { processed++; @@ -10248,7 +10248,7 @@ void CHThorLoopActivity::stop() ForEachItemIn(idx, loopPending) ReleaseRoxieRow(loopPending.item(idx)); loopPending.kill(); - CHThorSimpleActivityBase::stop(); + CHThorSimpleActivityBase::stop(); } //--------------------------------------------------------------------------- @@ -10365,7 +10365,7 @@ void CHThorGraphLoopActivity::ready() { executed = false; resultIndex = 0; - CHThorSimpleActivityBase::ready(); + CHThorSimpleActivityBase::ready(); maxIterations = helper.numIterations(); if ((int)maxIterations < 0) maxIterations = 0; loopResults.setown(agent.createGraphLoopResults()); @@ -10425,7 +10425,7 @@ void CHThorGraphLoopActivity::stop() rowAllocator.clear(); finalResult = NULL; loopResults.clear(); - CHThorSimpleActivityBase::stop(); + CHThorSimpleActivityBase::stop(); } //===================================================================================================== @@ -10441,7 +10441,7 @@ void CHThorParallelGraphLoopActivity::ready() { executed = false; resultIndex = 0; - CHThorSimpleActivityBase::ready(); + CHThorSimpleActivityBase::ready(); maxIterations = helper.numIterations(); if ((int)maxIterations < 0) maxIterations = 0; loopResults.setown(agent.createGraphLoopResults()); @@ -10491,7 +10491,7 @@ void CHThorParallelGraphLoopActivity::stop() rowAllocator.clear(); finalResult = NULL; loopResults.clear(); - CHThorSimpleActivityBase::stop(); + CHThorSimpleActivityBase::stop(); } //===================================================================================================== @@ -10632,7 +10632,7 @@ void CHThorLibraryCallActivity::stop() if (state != StateDone) { results.clear(); - CHThorSimpleActivityBase::stop(); + CHThorSimpleActivityBase::stop(); } } @@ -10733,9 +10733,9 @@ class CHThorNWayGraphLoopResultReadActivity : public CHThorSimpleActivityBase, i graphId = _graphId; } - virtual bool isGrouped() - { - return grouped; + virtual bool isGrouped() + { + return grouped; } virtual void ready()