Skip to content

Commit f7c7e1a

Browse files
authored
NUTCH-3150 Expand Caching Hadoop Counter References (#892)
1 parent 1242e22 commit f7c7e1a

14 files changed

Lines changed: 343 additions & 92 deletions

src/java/org/apache/nutch/crawl/CrawlDbFilter.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424
import org.apache.hadoop.io.Text;
25+
import org.apache.hadoop.mapreduce.Counter;
2526
import org.apache.hadoop.mapreduce.Mapper;
2627
import org.apache.hadoop.conf.Configuration;
2728
import org.apache.nutch.metrics.NutchMetrics;
@@ -50,6 +51,11 @@ public class CrawlDbFilter extends
5051

5152
private String scope;
5253

54+
// Cached counter references for performance
55+
private Counter goneRecordsRemovedCounter;
56+
private Counter orphanRecordsRemovedCounter;
57+
private Counter urlsFilteredCounter;
58+
5359
private static final Logger LOG = LoggerFactory
5460
.getLogger(MethodHandles.lookup().lookupClass());
5561

@@ -68,6 +74,21 @@ public void setup(Mapper<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
6874
scope = conf.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_CRAWLDB);
6975
normalizers = new URLNormalizers(conf, scope);
7076
}
77+
78+
// Initialize cached counter references
79+
initCounters(context);
80+
}
81+
82+
/**
83+
* Initialize cached counter references to avoid repeated lookups in hot paths.
84+
*/
85+
private void initCounters(Context context) {
86+
goneRecordsRemovedCounter = context.getCounter(
87+
NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_GONE_RECORDS_REMOVED_TOTAL);
88+
orphanRecordsRemovedCounter = context.getCounter(
89+
NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_ORPHAN_RECORDS_REMOVED_TOTAL);
90+
urlsFilteredCounter = context.getCounter(
91+
NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_URLS_FILTERED_TOTAL);
7192
}
7293

7394
private Text newKey = new Text();
@@ -81,15 +102,13 @@ public void map(Text key, CrawlDatum value,
81102
// https://issues.apache.org/jira/browse/NUTCH-1101 check status first,
82103
// cheaper than normalizing or filtering
83104
if (url404Purging && CrawlDatum.STATUS_DB_GONE == value.getStatus()) {
84-
context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER,
85-
NutchMetrics.CRAWLDB_GONE_RECORDS_REMOVED_TOTAL).increment(1);
105+
goneRecordsRemovedCounter.increment(1);
86106
return;
87107
}
88108
// Whether to remove orphaned pages
89109
// https://issues.apache.org/jira/browse/NUTCH-1932
90110
if (purgeOrphans && CrawlDatum.STATUS_DB_ORPHAN == value.getStatus()) {
91-
context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER,
92-
NutchMetrics.CRAWLDB_ORPHAN_RECORDS_REMOVED_TOTAL).increment(1);
111+
orphanRecordsRemovedCounter.increment(1);
93112
return;
94113
}
95114
if (url != null && urlNormalizers) {
@@ -109,8 +128,7 @@ public void map(Text key, CrawlDatum value,
109128
}
110129
}
111130
if (url == null) {
112-
context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER,
113-
NutchMetrics.CRAWLDB_URLS_FILTERED_TOTAL).increment(1);
131+
urlsFilteredCounter.increment(1);
114132
} else {
115133
// URL has passed filters
116134
newKey.set(url); // collect it

src/java/org/apache/nutch/crawl/CrawlDbReducer.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818

1919
import java.lang.invoke.MethodHandles;
2020
import java.util.ArrayList;
21+
import java.util.HashMap;
2122
import java.util.List;
23+
import java.util.Map;
2224
import java.util.Map.Entry;
2325
import java.io.IOException;
2426

2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
2729

30+
import org.apache.hadoop.mapreduce.Counter;
2831
import org.apache.hadoop.mapreduce.Reducer;
2932
import org.apache.hadoop.conf.Configuration;
3033
import org.apache.hadoop.io.Text;
@@ -52,6 +55,9 @@ public class CrawlDbReducer extends
5255
private FetchSchedule schedule;
5356
private ErrorTracker errorTracker;
5457

58+
// Cached counter references for status-based metrics
59+
private Map<Byte, Counter> statusCounters = new HashMap<>();
60+
5561
@Override
5662
public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
5763
Configuration conf = context.getConfiguration();
@@ -66,6 +72,15 @@ public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
6672
errorTracker = new ErrorTracker(NutchMetrics.GROUP_CRAWLDB, context);
6773
}
6874

75+
/**
76+
* Get counter for status, caching for subsequent lookups.
77+
*/
78+
private Counter getStatusCounter(byte status, Context context) {
79+
return statusCounters.computeIfAbsent(status,
80+
s -> context.getCounter(NutchMetrics.GROUP_CRAWLDB,
81+
CrawlDatum.getStatusName(s)));
82+
}
83+
6984
@Override
7085
public void reduce(Text key, Iterable<CrawlDatum> values,
7186
Context context) throws IOException, InterruptedException {
@@ -170,8 +185,7 @@ public void reduce(Text key, Iterable<CrawlDatum> values,
170185
}
171186
context.write(key, old);
172187
// Dynamic counter based on status name
173-
context.getCounter(NutchMetrics.GROUP_CRAWLDB,
174-
CrawlDatum.getStatusName(old.getStatus())).increment(1);
188+
getStatusCounter(old.getStatus(), context).increment(1);
175189
} else {
176190
LOG.warn("Missing fetch and old value, signature={}",
177191
StringUtil.toHexString(signature));
@@ -329,8 +343,7 @@ public void reduce(Text key, Iterable<CrawlDatum> values,
329343
result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
330344
context.write(key, result);
331345
// Dynamic counter based on status name
332-
context.getCounter(NutchMetrics.GROUP_CRAWLDB,
333-
CrawlDatum.getStatusName(result.getStatus())).increment(1);
346+
getStatusCounter(result.getStatus(), context).increment(1);
334347
}
335348

336349
}

src/java/org/apache/nutch/crawl/DeduplicationJob.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,33 @@ public static class DedupReducer<K extends Writable>
128128

129129
protected String[] compareOrder;
130130

131+
// Cached counter reference for performance
132+
private Counter documentsMarkedDuplicateCounter;
133+
131134
@Override
132135
public void setup(
133136
Reducer<K, CrawlDatum, Text, CrawlDatum>.Context context) {
134137
Configuration conf = context.getConfiguration();
135138
compareOrder = conf.get(DEDUPLICATION_COMPARE_ORDER).split(",");
139+
140+
// Initialize cached counter reference
141+
initCounters(context);
142+
}
143+
144+
/**
145+
* Initialize cached counter references to avoid repeated lookups in hot paths.
146+
*/
147+
private void initCounters(Context context) {
148+
documentsMarkedDuplicateCounter = context.getCounter(
149+
NutchMetrics.GROUP_DEDUP, NutchMetrics.DEDUP_DOCUMENTS_MARKED_DUPLICATE_TOTAL);
136150
}
137151

138152
protected void writeOutAsDuplicate(CrawlDatum datum,
139153
Context context)
140154
throws IOException, InterruptedException {
141155
datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
142156
Text key = (Text) datum.getMetaData().remove(urlKey);
143-
context.getCounter(NutchMetrics.GROUP_DEDUP,
144-
NutchMetrics.DEDUP_DOCUMENTS_MARKED_DUPLICATE_TOTAL).increment(1);
157+
documentsMarkedDuplicateCounter.increment(1);
145158
context.write(key, datum);
146159
}
147160

src/java/org/apache/nutch/crawl/Generator.java

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,17 @@ public static class SelectorMapper
194194
private JexlScript expr = null;
195195
private ErrorTracker errorTracker;
196196

197+
// Cached counter references for performance
198+
private Counter urlFiltersRejectedCounter;
199+
private Counter scheduleRejectedCounter;
200+
private Counter waitForUpdateCounter;
201+
private Counter exprRejectedCounter;
202+
private Counter statusRejectedCounter;
203+
private Counter scoreTooLowCounter;
204+
private Counter intervalRejectedCounter;
205+
private Counter hostsAffectedPerHostOverflowCounter;
206+
private Counter urlsSkippedPerHostOverflowCounter;
207+
197208
@Override
198209
public void setup(
199210
Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>.Context context)
@@ -219,6 +230,32 @@ public void setup(
219230
expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null));
220231
// Initialize error tracker with cached counters
221232
errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context);
233+
// Initialize cached counter references
234+
initCounters(context);
235+
}
236+
237+
/**
238+
* Initialize cached counter references to avoid repeated lookups in hot paths.
239+
*/
240+
private void initCounters(Context context) {
241+
urlFiltersRejectedCounter = context.getCounter(
242+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URL_FILTERS_REJECTED_TOTAL);
243+
scheduleRejectedCounter = context.getCounter(
244+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_SCHEDULE_REJECTED_TOTAL);
245+
waitForUpdateCounter = context.getCounter(
246+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_WAIT_FOR_UPDATE_TOTAL);
247+
exprRejectedCounter = context.getCounter(
248+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_EXPR_REJECTED_TOTAL);
249+
statusRejectedCounter = context.getCounter(
250+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_STATUS_REJECTED_TOTAL);
251+
scoreTooLowCounter = context.getCounter(
252+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_SCORE_TOO_LOW_TOTAL);
253+
intervalRejectedCounter = context.getCounter(
254+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_INTERVAL_REJECTED_TOTAL);
255+
hostsAffectedPerHostOverflowCounter = context.getCounter(
256+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL);
257+
urlsSkippedPerHostOverflowCounter = context.getCounter(
258+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL);
222259
}
223260

224261
@Override
@@ -230,8 +267,7 @@ public void map(Text key, CrawlDatum value, Context context)
230267
// URLFilters
231268
try {
232269
if (filters.filter(url.toString()) == null) {
233-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
234-
NutchMetrics.GENERATOR_URL_FILTERS_REJECTED_TOTAL).increment(1);
270+
urlFiltersRejectedCounter.increment(1);
235271
return;
236272
}
237273
} catch (URLFilterException e) {
@@ -245,8 +281,7 @@ public void map(Text key, CrawlDatum value, Context context)
245281
if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
246282
LOG.debug("-shouldFetch rejected '{}', fetchTime={}, curTime={}", url,
247283
crawlDatum.getFetchTime(), curTime);
248-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
249-
NutchMetrics.GENERATOR_SCHEDULE_REJECTED_TOTAL).increment(1);
284+
scheduleRejectedCounter.increment(1);
250285
return;
251286
}
252287

@@ -255,8 +290,7 @@ public void map(Text key, CrawlDatum value, Context context)
255290
if (oldGenTime != null) { // awaiting fetch & update
256291
if (oldGenTime.get() + genDelay > curTime) { // still wait for
257292
// update
258-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
259-
NutchMetrics.GENERATOR_WAIT_FOR_UPDATE_TOTAL).increment(1);
293+
waitForUpdateCounter.increment(1);
260294
return;
261295
}
262296
}
@@ -271,31 +305,27 @@ public void map(Text key, CrawlDatum value, Context context)
271305
// check expr
272306
if (expr != null) {
273307
if (!crawlDatum.execute(expr, key.toString())) {
274-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
275-
NutchMetrics.GENERATOR_EXPR_REJECTED_TOTAL).increment(1);
308+
exprRejectedCounter.increment(1);
276309
return;
277310
}
278311
}
279312

280313
if (restrictStatus != -1 && restrictStatus != crawlDatum.getStatus()) {
281-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
282-
NutchMetrics.GENERATOR_STATUS_REJECTED_TOTAL).increment(1);
314+
statusRejectedCounter.increment(1);
283315
return;
284316
}
285317

286318
// consider only entries with a score superior to the threshold
287319
if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold) {
288-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
289-
NutchMetrics.GENERATOR_SCORE_TOO_LOW_TOTAL).increment(1);
320+
scoreTooLowCounter.increment(1);
290321
return;
291322
}
292323

293324
// consider only entries with a retry (or fetch) interval lower than
294325
// threshold
295326
if (intervalThreshold != -1
296327
&& crawlDatum.getFetchInterval() > intervalThreshold) {
297-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
298-
NutchMetrics.GENERATOR_INTERVAL_REJECTED_TOTAL).increment(1);
328+
intervalRejectedCounter.increment(1);
299329
return;
300330
}
301331

@@ -332,6 +362,10 @@ public static class SelectorReducer extends
332362
private Map<String, HostDatum> hostDatumCache = new HashMap<>();
333363
private ErrorTracker errorTracker;
334364

365+
// Cached counter references for performance
366+
private Counter hostsAffectedPerHostOverflowCounter;
367+
private Counter urlsSkippedPerHostOverflowCounter;
368+
335369
public void readHostDb() throws IOException {
336370
if (conf.get(GENERATOR_HOSTDB) == null) {
337371
return;
@@ -426,10 +460,22 @@ public void setup(Context context) throws IOException {
426460
}
427461
// Initialize error tracker with cached counters
428462
errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context);
463+
// Initialize cached counter references
464+
initReducerCounters(context);
429465

430466
readHostDb();
431467
}
432468

469+
/**
470+
* Initialize cached counter references to avoid repeated lookups in hot paths.
471+
*/
472+
private void initReducerCounters(Context context) {
473+
hostsAffectedPerHostOverflowCounter = context.getCounter(
474+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL);
475+
urlsSkippedPerHostOverflowCounter = context.getCounter(
476+
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL);
477+
}
478+
433479
@Override
434480
public void cleanup(Context context)
435481
throws IOException, InterruptedException {
@@ -555,15 +601,13 @@ public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
555601
hostCount[1] = 1;
556602
} else {
557603
if (hostCount[1] == (maxCount+1)) {
558-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
559-
NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL).increment(1);
604+
hostsAffectedPerHostOverflowCounter.increment(1);
560605
LOG.info(
561606
"Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.",
562607
hostordomain, maxCount, maxNumSegments);
563608
}
564609
// skip this entry
565-
context.getCounter(NutchMetrics.GROUP_GENERATOR,
566-
NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL).increment(1);
610+
urlsSkippedPerHostOverflowCounter.increment(1);
567611
continue;
568612
}
569613
}

0 commit comments

Comments
 (0)