Skip to content

Commit 9853cf0

Browse files
authored
Merge pull request #54 from Telecominfraproject/main
https://telecominfraproject.atlassian.net/browse/WIFI-13147
2 parents ff052a9 + 3c68b1a commit 9853cf0

8 files changed

Lines changed: 184 additions & 48 deletions

File tree

build

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2
1+
6

src/framework/EventBusManager.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
namespace OpenWifi {
1111

12-
EventBusManager::EventBusManager(Poco::Logger &L) : Logger_(L) {}
13-
1412
void EventBusManager::run() {
1513
Running_ = true;
1614
Utils::SetThreadName("fmwk:EventMgr");

src/framework/EventBusManager.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ namespace OpenWifi {
1212

1313
class EventBusManager : public Poco::Runnable {
1414
public:
15+
EventBusManager() :
16+
Logger_(Poco::Logger::create(
17+
"EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel())) {
18+
}
19+
20+
static auto instance() {
21+
static auto instance_ = new EventBusManager;
22+
return instance_;
23+
}
24+
1525
explicit EventBusManager(Poco::Logger &L);
1626
void run() final;
1727
void Start();
@@ -24,4 +34,6 @@ namespace OpenWifi {
2434
Poco::Logger &Logger_;
2535
};
2636

37+
inline auto EventBusManager() { return EventBusManager::instance(); }
38+
2739
} // namespace OpenWifi

src/framework/MicroService.cpp

Lines changed: 78 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,23 @@ namespace OpenWifi {
3333

3434
void MicroService::Exit(int Reason) { std::exit(Reason); }
3535

36+
static std::string MakeServiceListString(const Types::MicroServiceMetaMap &Services) {
37+
std::string SvcList;
38+
for (const auto &Svc : Services) {
39+
if (SvcList.empty())
40+
SvcList = Svc.second.Type;
41+
else
42+
SvcList += ", " + Svc.second.Type;
43+
}
44+
return SvcList;
45+
}
46+
3647
void MicroService::BusMessageReceived([[maybe_unused]] const std::string &Key,
3748
const std::string &Payload) {
3849
std::lock_guard G(InfraMutex_);
50+
51+
Poco::Logger &BusLogger = EventBusManager()->Logger();
52+
3953
try {
4054
Poco::JSON::Parser P;
4155
auto Object = P.parse(Payload).extract<Poco::JSON::Object::Ptr>();
@@ -55,28 +69,18 @@ namespace OpenWifi {
5569
Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) {
5670
auto PrivateEndPoint =
5771
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString();
58-
if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE &&
59-
Services_.find(PrivateEndPoint) != Services_.end()) {
60-
Services_[PrivateEndPoint].LastUpdate = Utils::Now();
61-
} else if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
72+
if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
6273
Services_.erase(PrivateEndPoint);
63-
poco_debug(
64-
logger(),
74+
poco_information(
75+
BusLogger,
6576
fmt::format(
6677
"Service {} ID={} leaving system.",
6778
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
6879
.toString(),
6980
ID));
7081
} else if (Event == KafkaTopics::ServiceEvents::EVENT_JOIN ||
7182
Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE) {
72-
poco_debug(
73-
logger(),
74-
fmt::format(
75-
"Service {} ID={} joining system.",
76-
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
77-
.toString(),
78-
ID));
79-
Services_[PrivateEndPoint] = Types::MicroServiceMeta{
83+
auto ServiceInfo = Types::MicroServiceMeta{
8084
.Id = ID,
8185
.Type = Poco::toLower(
8286
Object->get(KafkaTopics::ServiceEvents::Fields::TYPE)
@@ -94,20 +98,46 @@ namespace OpenWifi {
9498
.toString(),
9599
.LastUpdate = Utils::Now()};
96100

97-
std::string SvcList;
98-
for (const auto &Svc : Services_) {
99-
if (SvcList.empty())
100-
SvcList = Svc.second.Type;
101-
else
102-
SvcList += ", " + Svc.second.Type;
101+
auto s1 = MakeServiceListString(Services_);
102+
auto PreviousSize = Services_.size();
103+
Services_[PrivateEndPoint] = ServiceInfo;
104+
auto CurrentSize = Services_.size();
105+
if(Event == KafkaTopics::ServiceEvents::EVENT_JOIN) {
106+
if(!s1.empty()) {
107+
poco_information(
108+
BusLogger,
109+
fmt::format(
110+
"Service {} ID={} is joining the system.",
111+
Object
112+
->get(
113+
KafkaTopics::ServiceEvents::Fields::PRIVATE)
114+
.toString(),
115+
ID));
116+
}
117+
std::string SvcList;
118+
for (const auto &Svc : Services_) {
119+
if (SvcList.empty())
120+
SvcList = Svc.second.Type;
121+
else
122+
SvcList += ", " + Svc.second.Type;
123+
}
124+
poco_information(
125+
BusLogger,
126+
fmt::format("Current list of microservices: {}", SvcList));
127+
} else if(CurrentSize!=PreviousSize) {
128+
poco_information(
129+
BusLogger,
130+
fmt::format(
131+
"Service {} ID={} is being added back in.",
132+
Object
133+
->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
134+
.toString(),
135+
ID));
103136
}
104-
poco_information(
105-
logger(),
106-
fmt::format("Current list of microservices: {}", SvcList));
107137
}
108138
} else {
109-
poco_error(
110-
logger(),
139+
poco_information(
140+
BusLogger,
111141
fmt::format("KAFKA-MSG: invalid event '{}', missing a field.",
112142
Event));
113143
}
@@ -118,32 +148,39 @@ namespace OpenWifi {
118148
Object->get(KafkaTopics::ServiceEvents::Fields::TOKEN).toString());
119149
#endif
120150
} else {
121-
poco_error(
122-
logger(),
151+
poco_information(
152+
BusLogger,
123153
fmt::format("KAFKA-MSG: invalid event '{}', missing token", Event));
124154
}
125155
} else {
126-
poco_error(logger(),
156+
poco_information(BusLogger,
127157
fmt::format("Unknown Event: {} Source: {}", Event, ID));
128158
}
129159
}
130160
} else {
131-
poco_error(logger(), "Bad bus message.");
132-
std::ostringstream os;
133-
Object->stringify(std::cout);
161+
std::ostringstream os;
162+
Object->stringify(std::cout);
163+
poco_error(BusLogger, fmt::format("Bad bus message: {}", os.str()));
134164
}
135165

136-
auto i = Services_.begin();
166+
auto ServiceHint = Services_.begin();
137167
auto now = Utils::Now();
138-
for (; i != Services_.end();) {
139-
if ((now - i->second.LastUpdate) > 60) {
140-
i = Services_.erase(i);
168+
auto si1 = Services_.size();
169+
auto ss1 = MakeServiceListString(Services_);
170+
while(ServiceHint!=Services_.end()) {
171+
if ((now - ServiceHint->second.LastUpdate) > 120) {
172+
poco_information(BusLogger, fmt::format("ZombieService: Removing service {}, ", ServiceHint->second.PublicEndPoint));
173+
ServiceHint = Services_.erase(ServiceHint);
141174
} else
142-
++i;
175+
++ServiceHint;
143176
}
177+
if(Services_.size() != si1) {
178+
auto ss2 = MakeServiceListString(Services_);
179+
poco_information(BusLogger, fmt::format("Current list of microservices: {} -> {}", ss1, ss2));
180+
}
144181

145182
} catch (const Poco::Exception &E) {
146-
logger().log(E);
183+
BusLogger.log(E);
147184
}
148185
}
149186

@@ -412,7 +449,7 @@ namespace OpenWifi {
412449
try {
413450
DataDir.createDirectory();
414451
} catch (const Poco::Exception &E) {
415-
logger().log(E);
452+
Logger_.log(E);
416453
}
417454
}
418455
WWWAssetsDir_ = ConfigPath("openwifi.restapi.wwwassets", "");
@@ -530,14 +567,12 @@ namespace OpenWifi {
530567
for (auto i : SubSystems_) {
531568
i->Start();
532569
}
533-
EventBusManager_ = std::make_unique<EventBusManager>(Poco::Logger::create(
534-
"EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel()));
535-
EventBusManager_->Start();
570+
EventBusManager()->Start();
536571
}
537572

538573
void MicroService::StopSubSystemServers() {
539574
AddActivity("Stopping");
540-
EventBusManager_->Stop();
575+
EventBusManager()->Stop();
541576
for (auto i = SubSystems_.rbegin(); i != SubSystems_.rend(); ++i) {
542577
(*i)->Stop();
543578
}
@@ -697,7 +732,7 @@ namespace OpenWifi {
697732
auto APIKEY = Request.get("X-API-KEY");
698733
return APIKEY == MyHash_;
699734
} catch (const Poco::Exception &E) {
700-
logger().log(E);
735+
Logger_.log(E);
701736
}
702737
return false;
703738
}

src/framework/MicroService.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ namespace OpenWifi {
201201
Poco::JWT::Signer Signer_;
202202
Poco::Logger &Logger_;
203203
Poco::ThreadPool TimerPool_{"timer:pool", 2, 32};
204-
std::unique_ptr<EventBusManager> EventBusManager_;
205204
};
206205

207206
inline MicroService *MicroService::instance_ = nullptr;

src/framework/StorageClass.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ namespace OpenWifi {
4747

4848
}
4949

50+
Poco::Data::SessionPool &Pool() { return *Pool_; }
51+
5052
private:
5153
inline int Setup_SQLite();
5254
inline int Setup_MySQL();

src/framework/orm.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,8 +576,8 @@ namespace ORM {
576576
bool UpdateRecord(field_name_t FieldName, const T &Value, const RecordType &R) {
577577
try {
578578
assert(ValidFieldName(FieldName));
579-
580579
Poco::Data::Session Session = Pool_.get();
580+
Session.begin();
581581
Poco::Data::Statement Update(Session);
582582

583583
RecordTuple RT;
@@ -593,6 +593,7 @@ namespace ORM {
593593
Update.execute();
594594
if (Cache_)
595595
Cache_->UpdateCache(R);
596+
Session.commit();
596597
return true;
597598
} catch (const Poco::Exception &E) {
598599
Logger_.log(E);
@@ -662,6 +663,7 @@ namespace ORM {
662663
assert(ValidFieldName(FieldName));
663664

664665
Poco::Data::Session Session = Pool_.get();
666+
Session.begin();
665667
Poco::Data::Statement Delete(Session);
666668

667669
std::string St = "delete from " + TableName_ + " where " + FieldName + "=?";
@@ -671,6 +673,7 @@ namespace ORM {
671673
Delete.execute();
672674
if (Cache_)
673675
Cache_->Delete(FieldName, Value);
676+
Session.commit();
674677
return true;
675678
} catch (const Poco::Exception &E) {
676679
Logger_.log(E);
@@ -682,11 +685,13 @@ namespace ORM {
682685
try {
683686
assert(!WhereClause.empty());
684687
Poco::Data::Session Session = Pool_.get();
688+
Session.begin();
685689
Poco::Data::Statement Delete(Session);
686690

687691
std::string St = "delete from " + TableName_ + " where " + WhereClause;
688692
Delete << St;
689693
Delete.execute();
694+
Session.commit();
690695
return true;
691696
} catch (const Poco::Exception &E) {
692697
Logger_.log(E);

src/framework/utils.h

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,5 +316,90 @@ namespace OpenWifi::Utils {
316316
uint32_t Port;
317317
};
318318

319+
class CompressedString {
320+
public:
321+
CompressedString() {
322+
DecompressedSize_ = 0;
323+
};
324+
325+
explicit CompressedString(const std::string &Data) : DecompressedSize_(Data.size()) {
326+
CompressIt(Data);
327+
}
328+
329+
CompressedString(const CompressedString &Data) {
330+
this->DecompressedSize_ = Data.DecompressedSize_;
331+
this->CompressedData_ = Data.CompressedData_;
332+
}
333+
334+
CompressedString& operator=(const CompressedString& rhs) {
335+
if (this != &rhs) {
336+
this->DecompressedSize_ = rhs.DecompressedSize_;
337+
this->CompressedData_ = rhs.CompressedData_;
338+
}
339+
return *this;
340+
}
341+
342+
CompressedString& operator=(CompressedString&& rhs) {
343+
if (this != &rhs) {
344+
this->DecompressedSize_ = rhs.DecompressedSize_;
345+
this->CompressedData_ = rhs.CompressedData_;
346+
}
347+
return *this;
348+
}
349+
350+
~CompressedString() = default;
351+
352+
operator std::string() const {
353+
return DecompressIt();
354+
}
355+
356+
CompressedString &operator=(const std::string &Data) {
357+
DecompressedSize_ = Data.size();
358+
CompressIt(Data);
359+
return *this;
360+
}
361+
362+
auto CompressedSize() const { return CompressedData_.size(); }
363+
auto DecompressedSize() const { return DecompressedSize_; }
364+
365+
private:
366+
std::string CompressedData_;
367+
std::size_t DecompressedSize_;
368+
369+
inline void CompressIt(const std::string &Data) {
370+
z_stream strm; // = {0};
371+
CompressedData_.resize(Data.size());
372+
strm.next_in = (Bytef *)Data.data();
373+
strm.avail_in = Data.size();
374+
strm.next_out = (Bytef *)CompressedData_.data();
375+
strm.avail_out = Data.size();
376+
strm.zalloc = Z_NULL;
377+
strm.zfree = Z_NULL;
378+
strm.opaque = Z_NULL;
379+
deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
380+
deflate(&strm, Z_FINISH);
381+
deflateEnd(&strm);
382+
CompressedData_.resize(strm.total_out);
383+
}
384+
385+
[[nodiscard]] std::string DecompressIt() const {
386+
std::string Result;
387+
if(DecompressedSize_!=0) {
388+
Result.resize(DecompressedSize_);
389+
z_stream strm ; //= {0};
390+
strm.next_in = (Bytef *)CompressedData_.data();
391+
strm.avail_in = CompressedData_.size();
392+
strm.next_out = (Bytef *)Result.data();
393+
strm.avail_out = Result.size();
394+
strm.zalloc = Z_NULL;
395+
strm.zfree = Z_NULL;
396+
strm.opaque = Z_NULL;
397+
inflateInit2(&strm, 15 + 32);
398+
inflate(&strm, Z_FINISH);
399+
inflateEnd(&strm);
400+
}
401+
return Result;
402+
}
403+
};
319404

320405
} // namespace OpenWifi::Utils

0 commit comments

Comments
 (0)