Skip to content

Commit 437fa2f

Browse files
committed
Update nfpcapd - split node into hotNode and coldNode
1 parent 7c5b47d commit 437fa2f

5 files changed

Lines changed: 318 additions & 325 deletions

File tree

src/nfpcapd/flowdump.c

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -105,26 +105,26 @@ static int StorePcapFlow(flowParam_t *flowParam, struct FlowNode *Node) {
105105
// pack V3 record
106106
UpdateRecordSize(EXgenericFlowSize);
107107
PushExtension(recordHeader, EXgenericFlow, genericFlow);
108-
genericFlow->msecFirst = (1000LL * (uint64_t)Node->t_first.tv_sec) + (uint64_t)Node->t_first.tv_usec / 1000LL;
109-
genericFlow->msecLast = (1000LL * (uint64_t)Node->t_last.tv_sec) + (uint64_t)Node->t_last.tv_usec / 1000LL;
108+
genericFlow->msecFirst = (1000LL * (uint64_t)Node->hotNode.t_first.tv_sec) + (uint64_t)Node->hotNode.t_first.tv_usec / 1000LL;
109+
genericFlow->msecLast = (1000LL * (uint64_t)Node->hotNode.t_last.tv_sec) + (uint64_t)Node->hotNode.t_last.tv_usec / 1000LL;
110110

111111
struct timeval now;
112112
gettimeofday(&now, NULL);
113113
genericFlow->msecReceived = (uint64_t)now.tv_sec * 1000LL + (uint64_t)now.tv_usec / 1000LL;
114114

115-
genericFlow->inPackets = Node->packets;
116-
genericFlow->inBytes = Node->bytes;
115+
genericFlow->inPackets = Node->hotNode.packets;
116+
genericFlow->inBytes = Node->hotNode.bytes;
117117

118-
genericFlow->tcpFlags = Node->flags;
119-
genericFlow->proto = Node->flowKey.proto;
120-
genericFlow->srcPort = Node->flowKey.src_port;
121-
genericFlow->dstPort = Node->flowKey.dst_port;
118+
genericFlow->tcpFlags = Node->hotNode.flags;
119+
genericFlow->proto = Node->hotNode.flowKey.proto;
120+
genericFlow->srcPort = Node->hotNode.flowKey.src_port;
121+
genericFlow->dstPort = Node->hotNode.flowKey.dst_port;
122122

123-
if (Node->flowKey.version == AF_INET6) {
123+
if (Node->hotNode.flowKey.version == AF_INET6) {
124124
UpdateRecordSize(EXipv6FlowSize);
125125
PushExtension(recordHeader, EXipv6Flow, ipv6Flow);
126-
uint64_t *src = (uint64_t *)Node->flowKey.src_addr.bytes;
127-
uint64_t *dst = (uint64_t *)Node->flowKey.dst_addr.bytes;
126+
uint64_t *src = (uint64_t *)Node->hotNode.flowKey.src_addr.bytes;
127+
uint64_t *dst = (uint64_t *)Node->hotNode.flowKey.dst_addr.bytes;
128128
ipv6Flow->srcAddr[0] = ntohll(src[0]);
129129
ipv6Flow->srcAddr[1] = ntohll(src[1]);
130130
ipv6Flow->dstAddr[0] = ntohll(dst[0]);
@@ -133,53 +133,53 @@ static int StorePcapFlow(flowParam_t *flowParam, struct FlowNode *Node) {
133133
UpdateRecordSize(EXipv4FlowSize);
134134
PushExtension(recordHeader, EXipv4Flow, ipv4Flow);
135135
uint32_t ipv4;
136-
memcpy(&ipv4, Node->flowKey.src_addr.bytes + 12, 4);
136+
memcpy(&ipv4, Node->hotNode.flowKey.src_addr.bytes + 12, 4);
137137
ipv4Flow->srcAddr = ntohl(ipv4);
138-
memcpy(&ipv4, Node->flowKey.dst_addr.bytes + 12, 4);
138+
memcpy(&ipv4, Node->hotNode.flowKey.dst_addr.bytes + 12, 4);
139139
ipv4Flow->dstAddr = ntohl(ipv4);
140140
}
141141

142142
if (flowParam->extendedFlow) {
143143
UpdateRecordSize(EXipInfoSize);
144144
PushExtension(recordHeader, EXipInfo, ipInfo);
145-
ipInfo->minTTL = Node->minTTL;
146-
ipInfo->maxTTL = Node->maxTTL;
147-
ipInfo->fragmentFlags = Node->fragmentFlags;
145+
ipInfo->minTTL = Node->hotNode.minTTL;
146+
ipInfo->maxTTL = Node->hotNode.maxTTL;
147+
ipInfo->fragmentFlags = Node->coldNode.fragmentFlags;
148148

149-
if (Node->vlanID) {
149+
if (Node->coldNode.vlanID) {
150150
UpdateRecordSize(EXvLanSize);
151151
PushExtension(recordHeader, EXvLan, vlan);
152-
vlan->srcVlan = Node->vlanID;
152+
vlan->srcVlan = Node->coldNode.vlanID;
153153
}
154154

155-
if (Node->srcMac) {
155+
if (Node->coldNode.srcMac) {
156156
UpdateRecordSize(EXmacAddrSize);
157157
PushExtension(recordHeader, EXmacAddr, macAddr);
158-
macAddr->inSrcMac = ntohll(Node->srcMac) >> 16;
159-
macAddr->outDstMac = ntohll(Node->dstMac) >> 16;
158+
macAddr->inSrcMac = ntohll(Node->coldNode.srcMac) >> 16;
159+
macAddr->outDstMac = ntohll(Node->coldNode.dstMac) >> 16;
160160
macAddr->inDstMac = 0;
161161
macAddr->outSrcMac = 0;
162162
}
163163

164-
if (Node->mpls[0]) {
164+
if (Node->coldNode.mpls[0]) {
165165
UpdateRecordSize(EXmplsLabelSize);
166166
PushExtension(recordHeader, EXmplsLabel, mplsLabel);
167-
for (int i = 0; Node->mpls[i] != 0; i++) {
168-
mplsLabel->mplsLabel[i] = ntohl(Node->mpls[i]) >> 8;
167+
for (int i = 0; Node->coldNode.mpls[i] != 0; i++) {
168+
mplsLabel->mplsLabel[i] = ntohl(Node->coldNode.mpls[i]) >> 8;
169169
}
170170
}
171171

172-
if (Node->flowKey.proto == IPPROTO_TCP && Node->latency.application) {
172+
if (Node->hotNode.flowKey.proto == IPPROTO_TCP && Node->coldNode.latency.application) {
173173
UpdateRecordSize(EXlatencySize);
174174
PushExtension(recordHeader, EXlatency, latency);
175-
latency->usecClientNwDelay = Node->latency.client;
176-
latency->usecServerNwDelay = Node->latency.server;
177-
latency->usecApplLatency = Node->latency.application;
178-
dbg_printf("Node RTT: %u\n", Node->latency.rtt);
175+
latency->usecClientNwDelay = Node->coldNode.latency.client;
176+
latency->usecServerNwDelay = Node->coldNode.latency.server;
177+
latency->usecApplLatency = Node->coldNode.latency.application;
178+
dbg_printf("Node RTT: %u\n", Node->coldNode.latency.rtt);
179179
}
180180

181-
if (Node->pflog) {
182-
pflog_hdr_t *pflog = (pflog_hdr_t *)Node->pflog;
181+
if (Node->coldNode.pflog) {
182+
pflog_hdr_t *pflog = (pflog_hdr_t *)Node->coldNode.pflog;
183183
size_t ifnameLen = strnlen(pflog->ifname, IFNAMSIZ);
184184
if (ifnameLen) {
185185
ifnameLen++; // add terminating '\0'
@@ -205,38 +205,38 @@ static int StorePcapFlow(flowParam_t *flowParam, struct FlowNode *Node) {
205205
}
206206

207207
if (flowParam->addPayload) {
208-
if (Node->payloadSize) {
209-
size_t payloadSize = Node->payloadSize;
208+
if (Node->coldNode.payloadSize) {
209+
size_t payloadSize = Node->coldNode.payloadSize;
210210
size_t align = payloadSize & 0x3;
211211
if (align) {
212212
payloadSize += (4 - align);
213213
}
214214

215215
UpdateRecordSize(EXinPayloadSize + payloadSize);
216216
PushVarLengthPointer(recordHeader, EXinPayload, inPayload, payloadSize);
217-
memcpy(inPayload, Node->payload, Node->payloadSize);
217+
memcpy(inPayload, Node->coldNode.payload, Node->coldNode.payloadSize);
218218
}
219219
}
220220

221-
if (Node->tun_ip_version == AF_INET) {
221+
if (Node->coldNode.tun_ip_version == AF_INET) {
222222
UpdateRecordSize(EXtunIPv4Size);
223223
PushExtension(recordHeader, EXtunIPv4, tunIPv4);
224224
uint32_t ipv4;
225-
memcpy(&ipv4, Node->tun_src_addr.bytes + 12, 4);
225+
memcpy(&ipv4, Node->coldNode.tun_src_addr.bytes + 12, 4);
226226
tunIPv4->tunSrcAddr = ntohl(ipv4);
227-
memcpy(&ipv4, Node->tun_dst_addr.bytes + 12, 4);
227+
memcpy(&ipv4, Node->coldNode.tun_dst_addr.bytes + 12, 4);
228228
tunIPv4->tunDstAddr = ntohl(ipv4);
229-
tunIPv4->tunProto = Node->tun_proto;
230-
} else if (Node->tun_ip_version == AF_INET6) {
229+
tunIPv4->tunProto = Node->coldNode.tun_proto;
230+
} else if (Node->coldNode.tun_ip_version == AF_INET6) {
231231
UpdateRecordSize(EXtunIPv6Size);
232232
PushExtension(recordHeader, EXtunIPv6, tunIPv6);
233-
uint64_t *src = (uint64_t *)Node->tun_src_addr.bytes;
234-
uint64_t *dst = (uint64_t *)Node->tun_dst_addr.bytes;
233+
uint64_t *src = (uint64_t *)Node->coldNode.tun_src_addr.bytes;
234+
uint64_t *dst = (uint64_t *)Node->coldNode.tun_dst_addr.bytes;
235235
tunIPv6->tunSrcAddr[0] = ntohll(src[0]);
236236
tunIPv6->tunSrcAddr[1] = ntohll(src[1]);
237237
tunIPv6->tunDstAddr[0] = ntohll(dst[0]);
238238
tunIPv6->tunDstAddr[1] = ntohll(dst[1]);
239-
tunIPv6->tunProto = Node->tun_proto;
239+
tunIPv6->tunProto = Node->coldNode.tun_proto;
240240
}
241241

242242
// update first_seen, last_seen
@@ -360,7 +360,7 @@ __attribute__((noreturn)) void *flow_thread(void *thread_data) {
360360
fs->bad_packets = 0;
361361
while (1) {
362362
struct FlowNode *Node = Pop_Node(flowParam->NodeList);
363-
if (Node->signal == SIGNAL_SYNC) {
363+
if (Node->hotNode.signal == SIGNAL_SYNC) {
364364
// Flush Exporter Stat to file
365365
FlushExporterStats(fs);
366366
// flush current block and close file
@@ -377,14 +377,14 @@ __attribute__((noreturn)) void *flow_thread(void *thread_data) {
377377
// Dump all exporters to the buffer for new file
378378
FlushStdRecords(fs);
379379

380-
} else if (Node->signal == SIGNAL_DONE) {
380+
} else if (Node->hotNode.signal == SIGNAL_DONE) {
381381
// Flush Exporter Stat to file
382382
FlushExporterStats(fs);
383383
// flush current block and close file
384384
FlushBlock(fs->nffile, fs->dataBlock);
385385
CloseFlowFile(flowParam, Node->timestamp);
386386
break;
387-
} else if (Node->nodeType == FLOW_NODE) {
387+
} else if (Node->hotNode.nodeType == FLOW_NODE) {
388388
StorePcapFlow(flowParam, Node);
389389
} else {
390390
// skip this node

src/nfpcapd/flowsend.c

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -122,26 +122,26 @@ static int ProcessFlow(flowParam_t *flowParam, struct FlowNode *Node) {
122122
// pack V3 record
123123
UpdateRecordSize(EXgenericFlowSize);
124124
PushExtension(recordHeader, EXgenericFlow, genericFlow);
125-
genericFlow->msecFirst = (1000 * Node->t_first.tv_sec) + Node->t_first.tv_usec / 1000;
126-
genericFlow->msecLast = (1000 * Node->t_last.tv_sec) + Node->t_last.tv_usec / 1000;
125+
genericFlow->msecFirst = (1000 * Node->hotNode.t_first.tv_sec) + Node->hotNode.t_first.tv_usec / 1000;
126+
genericFlow->msecLast = (1000 * Node->hotNode.t_last.tv_sec) + Node->hotNode.t_last.tv_usec / 1000;
127127

128128
struct timeval now;
129129
gettimeofday(&now, NULL);
130130
genericFlow->msecReceived = now.tv_sec * 1000L + now.tv_usec / 1000;
131131

132-
genericFlow->inPackets = Node->packets;
133-
genericFlow->inBytes = Node->bytes;
132+
genericFlow->inPackets = Node->hotNode.packets;
133+
genericFlow->inBytes = Node->hotNode.bytes;
134134

135-
genericFlow->tcpFlags = Node->flags;
136-
genericFlow->proto = Node->flowKey.proto;
137-
genericFlow->srcPort = Node->flowKey.src_port;
138-
genericFlow->dstPort = Node->flowKey.dst_port;
135+
genericFlow->tcpFlags = Node->hotNode.flags;
136+
genericFlow->proto = Node->hotNode.flowKey.proto;
137+
genericFlow->srcPort = Node->hotNode.flowKey.src_port;
138+
genericFlow->dstPort = Node->hotNode.flowKey.dst_port;
139139

140-
if (Node->flowKey.version == AF_INET6) {
140+
if (Node->hotNode.flowKey.version == AF_INET6) {
141141
UpdateRecordSize(EXipv6FlowSize);
142142
PushExtension(recordHeader, EXipv6Flow, ipv6Flow);
143-
uint64_t *src = (uint64_t *)Node->flowKey.src_addr.bytes;
144-
uint64_t *dst = (uint64_t *)Node->flowKey.dst_addr.bytes;
143+
uint64_t *src = (uint64_t *)Node->hotNode.flowKey.src_addr.bytes;
144+
uint64_t *dst = (uint64_t *)Node->hotNode.flowKey.dst_addr.bytes;
145145
ipv6Flow->srcAddr[0] = ntohll(src[0]);
146146
ipv6Flow->srcAddr[1] = ntohll(src[1]);
147147
ipv6Flow->dstAddr[0] = ntohll(dst[0]);
@@ -150,48 +150,48 @@ static int ProcessFlow(flowParam_t *flowParam, struct FlowNode *Node) {
150150
UpdateRecordSize(EXipv4FlowSize);
151151
PushExtension(recordHeader, EXipv4Flow, ipv4Flow);
152152
uint32_t ipv4;
153-
memcpy(&ipv4, Node->flowKey.src_addr.bytes + 12, 4);
153+
memcpy(&ipv4, Node->hotNode.flowKey.src_addr.bytes + 12, 4);
154154
ipv4Flow->srcAddr = ntohl(ipv4);
155-
memcpy(&ipv4, Node->flowKey.dst_addr.bytes + 12, 4);
155+
memcpy(&ipv4, Node->hotNode.flowKey.dst_addr.bytes + 12, 4);
156156
ipv4Flow->dstAddr = ntohl(ipv4);
157157
}
158158

159159
if (flowParam->extendedFlow) {
160-
if (Node->vlanID) {
160+
if (Node->coldNode.vlanID) {
161161
UpdateRecordSize(EXvLanSize);
162162
PushExtension(recordHeader, EXvLan, vlan);
163-
vlan->dstVlan = Node->vlanID;
163+
vlan->dstVlan = Node->coldNode.vlanID;
164164
}
165165

166166
UpdateRecordSize(EXmacAddrSize);
167167
PushExtension(recordHeader, EXmacAddr, macAddr);
168-
macAddr->inSrcMac = ntohll(Node->srcMac) >> 16;
169-
macAddr->outDstMac = ntohll(Node->dstMac) >> 16;
168+
macAddr->inSrcMac = ntohll(Node->coldNode.srcMac) >> 16;
169+
macAddr->outDstMac = ntohll(Node->coldNode.dstMac) >> 16;
170170
macAddr->inDstMac = 0;
171171
macAddr->outSrcMac = 0;
172172

173-
if (Node->mpls[0]) {
173+
if (Node->coldNode.mpls[0]) {
174174
UpdateRecordSize(EXmplsLabelSize);
175175
PushExtension(recordHeader, EXmplsLabel, mplsLabel);
176-
for (int i = 0; Node->mpls[i] != 0; i++) {
177-
mplsLabel->mplsLabel[i] = ntohl(Node->mpls[i]) >> 8;
176+
for (int i = 0; Node->coldNode.mpls[i] != 0; i++) {
177+
mplsLabel->mplsLabel[i] = ntohl(Node->coldNode.mpls[i]) >> 8;
178178
}
179179
}
180180

181-
if (Node->flowKey.proto == IPPROTO_TCP) {
181+
if (Node->hotNode.flowKey.proto == IPPROTO_TCP) {
182182
UpdateRecordSize(EXlatencySize);
183183
PushExtension(recordHeader, EXlatency, latency);
184-
latency->usecClientNwDelay = Node->latency.client;
185-
latency->usecServerNwDelay = Node->latency.server;
186-
latency->usecApplLatency = Node->latency.application;
184+
latency->usecClientNwDelay = Node->coldNode.latency.client;
185+
latency->usecServerNwDelay = Node->coldNode.latency.server;
186+
latency->usecApplLatency = Node->coldNode.latency.application;
187187
}
188188
}
189189

190190
if (flowParam->addPayload) {
191-
if (Node->payloadSize) {
192-
UpdateRecordSize(EXinPayloadSize + Node->payloadSize);
193-
PushVarLengthPointer(recordHeader, EXinPayload, inPayload, Node->payloadSize);
194-
memcpy(inPayload, Node->payload, Node->payloadSize);
191+
if (Node->coldNode.payloadSize) {
192+
UpdateRecordSize(EXinPayloadSize + Node->coldNode.payloadSize);
193+
PushVarLengthPointer(recordHeader, EXinPayload, inPayload, Node->coldNode.payloadSize);
194+
memcpy(inPayload, Node->coldNode.payload, Node->coldNode.payloadSize);
195195
}
196196
}
197197

@@ -240,9 +240,9 @@ __attribute__((noreturn)) void *sendflow_thread(void *thread_data) {
240240
printRecord = flowParam->printRecord;
241241
while (1) {
242242
struct FlowNode *Node = Pop_Node(flowParam->NodeList);
243-
if (Node->signal == SIGNAL_SYNC) {
243+
if (Node->hotNode.signal == SIGNAL_SYNC) {
244244
// skip
245-
} else if (Node->signal == SIGNAL_DONE) {
245+
} else if (Node->hotNode.signal == SIGNAL_DONE) {
246246
CloseSender(flowParam, Node->timestamp);
247247
break;
248248
} else {

0 commit comments

Comments
 (0)