Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,17 @@ interface IOrderedOutputSerializer;
typedef enum { ofSTD, ofXML, ofRAW } outputFmts;
enum class AccessMode : unsigned;

struct IAgentContext : extends IGlobalCodeContext
interface ITempFileHandler
{
// name is the name/id of the temporary file, not the filename on disk.
virtual const char *noteTemporaryFile(const char *name) = 0;
// name is the name/id of the temporary file, not the filename on disk.
virtual const char *queryTemporaryFile(const char *name) = 0;
// fname is the temporary filename on disk returned from noteTemporaryFile or queryTemporaryFile.
virtual void removeTemporaryFile(const char *fname) = 0;
};

struct IAgentContext : extends IGlobalCodeContext, extends ITempFileHandler
{
virtual void reportProgress(const char *msg, unsigned flags=0) = 0;
virtual bool queryResolveFilesLocally() = 0;
Expand All @@ -92,12 +102,9 @@ struct IAgentContext : extends IGlobalCodeContext

virtual IConstWorkUnit *queryWorkUnit() const = 0;
virtual IWorkUnit *updateWorkUnit() const = 0;

virtual ILocalOrDistributedFile *resolveLFN(const char *logicalName, const char *errorTxt, bool optional, bool noteRead, AccessMode accessMode, StringBuffer * expandedlfn, bool isPrivilegedUser) = 0;
virtual StringBuffer & getTempfileBase(StringBuffer & buff) = 0;
virtual const char *noteTemporaryFile(const char *fname) = 0;
virtual const char *noteTemporaryFilespec(const char *fname) = 0;
virtual const char *queryTemporaryFile(const char *fname) = 0;
virtual void reloadWorkUnit() = 0;

virtual char *resolveName(const char *in, char *out, unsigned outlen) = 0;
Expand All @@ -110,14 +117,14 @@ struct IAgentContext : extends IGlobalCodeContext
virtual void outputFormattedResult(const char *name, unsigned sequence, bool close) = 0;

virtual const char *queryAllowedPipePrograms() = 0;

virtual IOrderedOutputSerializer * queryOutputSerializer() = 0;

virtual IGroup *getHThorGroup(StringBuffer &grpnameout) = 0;

virtual RecordTranslationMode getLayoutTranslationMode() const = 0;
virtual unsigned __int64 queryStopAfter() = 0;

virtual const char *queryWuid() = 0;

virtual void updateWULogfile(IWorkUnit *outputWU) = 0;
Expand Down
84 changes: 59 additions & 25 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,51 +610,85 @@ const char *EclAgent::queryTempfilePath()
return agentTempDir.str();
}

StringBuffer & EclAgent::getTempfileBase(StringBuffer & buff)
StringBuffer &EclAgent::getTempfileBase(StringBuffer &buff)
{
return buff.append(queryTempfilePath()).append(PATHSEPCHAR).appendLower(wuid);
}

const char *EclAgent::queryTemporaryFile(const char *fname)
void EclAgent::buildTempFilename(StringBuffer &tempFilename, const char *name)
{
StringBuffer tempfilename;
getTempfileBase(tempfilename).append(PATHSEPCHAR).append(fname);
CriticalBlock crit(tfsect);
ForEachItemIn(idx, tempFiles)
getTempfileBase(tempFilename).append(PATHSEPCHAR).append(name);
}

const char *EclAgent::queryTemporaryFile(const char *name)
{
dbgassertex(!isEmptyString(name));

StringBuffer tempFilename;
buildTempFilename(tempFilename, name);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move as much as possible outside of the crit, i.e. if it can't contend, don't keep it a mutex.
It's semi needed now, because buildTempFilename does a one-time initialization ("tempFilePrefix"), however, this mutex should really be protecting the std::set only.
See comment re. one-time initialization of the prefix above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code in tfsect critical section moved out where possible in 591f31b


{
if (strcmp(tempFiles.item(idx), tempfilename.str())==0)
return tempFiles.item(idx);
CriticalBlock crit(tfsect);

auto it = tempFileSet.find(tempFilename.str());
if (it != tempFileSet.end())
return it->c_str();
}
StringBuffer errmsg;
errmsg.append("Attempt to read temp file that has not yet been registered: ").append(tempfilename);

VStringBuffer errmsg("Attempt to read temp file that has not yet been registered: %s", name);
fail(0, errmsg.str());
return 0;
return nullptr;
}

const char *EclAgent::noteTemporaryFile(const char *fname)
const char *EclAgent::noteTemporaryFile(const char *name)
{
StringBuffer tempfilename;
getTempfileBase(tempfilename).append(PATHSEPCHAR).append(fname);
CriticalBlock crit(tfsect);
tempFiles.append(tempfilename.str());
return tempFiles.item(tempFiles.length()-1);
dbgassertex(!isEmptyString(name));

StringBuffer tempFilename;
buildTempFilename(tempFilename, name);

std::pair<std::set<std::string>::iterator, bool> inserted;
{
CriticalBlock crit(tfsect);

inserted = tempFileSet.emplace(tempFilename.str());
}

// The returned pointer refers to the std::string stored in tempFileSet and
// is only valid while that entry remains in the set.

if (!inserted.second)
{
VStringBuffer errmsg("Temp file already registered: %s", name);
fail(0, errmsg.str());
return nullptr;
}

return inserted.first->c_str();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is thread-safe afaics, as long as tempFileSet entries are not removed or altered (which should be the case). It is probably worth adding a clarifying comment to make it clear to the casual reader of the semantics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added in a4755f1

}

const char *EclAgent::noteTemporaryFilespec(const char *fspec)
void EclAgent::removeTemporaryFile(const char *fname)
{
dbgassertex(!isEmptyString(fname));

CriticalBlock crit(tfsect);
tempFiles.append(fspec);
return tempFiles.item(tempFiles.length()-1);

auto it = tempFileSet.find(std::string(fname));
dbgassertex(it != tempFileSet.end());
if (it != tempFileSet.end())
{
remove(it->c_str());
tempFileSet.erase(it);
}
}

void EclAgent::deleteTempFiles()
{
CriticalBlock crit(tfsect);
ForEachItemIn(idx, tempFiles)
{
remove(tempFiles.item(idx));
}
tempFiles.kill();

for (const auto& f : tempFileSet)
remove(f.c_str());
tempFileSet.clear();
}

const char *EclAgent::loadResource(unsigned id)
Expand Down
24 changes: 13 additions & 11 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "thorcommon.hpp"
#include "enginecontext.hpp"

#include <set>

#define MAX_EDGEDATA_LENGTH 30000
#define MAX_HEX_SIZE 500

Expand Down Expand Up @@ -172,17 +174,17 @@ public:
{
return ctx->getTempfileBase(buff);
}
virtual const char *noteTemporaryFile(const char *fname)
virtual const char *noteTemporaryFile(const char *name) override
{
return ctx->noteTemporaryFile(fname);
return ctx->noteTemporaryFile(name);
}
virtual const char *noteTemporaryFilespec(const char *fspec)
virtual const char *queryTemporaryFile(const char *name) override
{
return ctx->noteTemporaryFilespec(fspec);
return ctx->queryTemporaryFile(name);
}
virtual const char *queryTemporaryFile(const char *fname)
virtual void removeTemporaryFile(const char *fname) override
{
return ctx->queryTemporaryFile(fname);
ctx->removeTemporaryFile(fname);
}
virtual void reloadWorkUnit()
{
Expand Down Expand Up @@ -350,7 +352,6 @@ public:
virtual bool onDebuggerTimeout();
};


class CHThorDebugContext;
class EclAgent : implements IAgentContext, implements ICodeContext, implements IRowAllocatorMetaActIdCacheCallback, implements IEngineContext, public CInterface
{
Expand Down Expand Up @@ -378,7 +379,7 @@ private:
outputFmts outputFmt = ofSTD;
unsigned __int64 stopAfter;
mutable CriticalSection wusect;
StringArray tempFiles;
std::set<std::string> tempFileSet; // Set of actual temp file names on disk
CriticalSection tfsect;
IArray persistReadLocks;
StringArray processedPersists;
Expand All @@ -404,6 +405,7 @@ private:

private:
void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
void buildTempFilename(StringBuffer & tempFilename, const char *filename);
IEclProcess *loadProcess();
StringBuffer & getTempfileBase(StringBuffer & buff);
const char *queryTempfilePath();
Expand Down Expand Up @@ -615,9 +617,9 @@ public:

virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash);
virtual void reportProgress(const char *msg, unsigned flags=0);
virtual const char *noteTemporaryFile(const char *fname);
virtual const char *noteTemporaryFilespec(const char *fspec);
virtual const char *queryTemporaryFile(const char *fname);
virtual const char *noteTemporaryFile(const char *name) override;
virtual const char *queryTemporaryFile(const char *name) override;
virtual void removeTemporaryFile(const char *fname) override;
virtual void deleteFile(const char * logicalName);

void addException(ErrorSeverity severity, const char * source, unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool failOnError, bool isAbort);
Expand Down
Loading
Loading