-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Fix HNSW InfoStream duplicate times and add per-chunk completion logging #15978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
512d5b4
01fd29e
3afd11d
b14a463
de39a78
4e9d5a5
1d1f0b2
e50f9c9
ba36eac
c8f5ede
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |
| import java.util.Locale; | ||
| import java.util.Objects; | ||
| import java.util.SplittableRandom; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.locks.Lock; | ||
| import org.apache.lucene.internal.hppc.IntHashSet; | ||
| import org.apache.lucene.search.KnnCollector; | ||
|
|
@@ -85,6 +85,29 @@ public class HnswGraphBuilder implements HnswBuilder { | |
| protected InfoStream infoStream = InfoStream.getDefault(); | ||
| protected boolean frozen; | ||
|
|
||
| /** | ||
| * Merge-level start time in nanoseconds. When set, the periodic progress prints (every 10K | ||
| * vectors) show elapsed time since the overall merge began rather than since the current chunk | ||
| * began. A value of -1 means not set (non-concurrent path). | ||
| */ | ||
| private long mergeStartTimeNs = -1; | ||
|
|
||
| /** | ||
| * Shared accumulator for total worker time across all concurrent merge workers. Each chunk's | ||
| * elapsed time is added here so that effective concurrency can be computed at merge end. | ||
| */ | ||
| private AtomicLong cumulativeWorkTimeNs; | ||
|
|
||
| /** Set the merge-level start time so progress prints show time since merge began. */ | ||
| void setMergeStartTimeNs(long mergeStartTimeNs) { | ||
| this.mergeStartTimeNs = mergeStartTimeNs; | ||
| } | ||
|
|
||
| /** Set the shared accumulator for tracking cumulative worker time across concurrent chunks. */ | ||
| void setCumulativeWorkTimeNs(AtomicLong cumulativeWorkTimeNs) { | ||
| this.cumulativeWorkTimeNs = cumulativeWorkTimeNs; | ||
| } | ||
|
|
||
| public static HnswGraphBuilder create( | ||
| RandomVectorScorerSupplier scorerSupplier, int M, int beamWidth, long seed) | ||
| throws IOException { | ||
|
|
@@ -204,16 +227,30 @@ protected void addVectors(int minOrd, int maxOrd) throws IOException { | |
| if (frozen) { | ||
| throw new IllegalStateException("This HnswGraphBuilder is frozen and cannot be updated"); | ||
| } | ||
| long start = System.nanoTime(), t = start; | ||
| if (infoStream.isEnabled(HNSW_COMPONENT)) { | ||
| infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); | ||
| } | ||
| long startNs = System.nanoTime(); | ||
| long progressStartNs = mergeStartTimeNs != -1 ? mergeStartTimeNs : startNs; | ||
| for (int node = minOrd; node < maxOrd; node++) { | ||
| addGraphNode(node); | ||
| if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { | ||
| t = printGraphBuildStatus(node, start, t); | ||
| printGraphBuildStatus(node, progressStartNs); | ||
| } | ||
| } | ||
| long chunkedElapsedNs = System.nanoTime() - startNs; | ||
| if (cumulativeWorkTimeNs != null) { | ||
| cumulativeWorkTimeNs.addAndGet(chunkedElapsedNs); | ||
| } | ||
| if (infoStream.isEnabled(HNSW_COMPONENT)) { | ||
| double elapsedMs = chunkedElapsedNs / 1_000_000.0; | ||
| infoStream.message( | ||
| HNSW_COMPONENT, | ||
| String.format( | ||
| Locale.ROOT, | ||
| "addVectors [%d %d): %d vectors in %.2f ms", | ||
| minOrd, | ||
| maxOrd, | ||
| maxOrd - minOrd, | ||
| elapsedMs)); | ||
| } | ||
| } | ||
|
|
||
| private void addVectors(int maxOrd) throws IOException { | ||
|
|
@@ -339,17 +376,10 @@ public void addGraphNode(int node, IntHashSet eps0) throws IOException { | |
| addGraphNodeInternal(node, scorer, eps0); | ||
| } | ||
|
|
||
| private long printGraphBuildStatus(int node, long start, long t) { | ||
| long now = System.nanoTime(); | ||
| private void printGraphBuildStatus(int node, long startNs) { | ||
| double elapsedMs = (System.nanoTime() - startNs) / 1_000_000.0; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, can we be consistent about
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, updated. Thanks! |
||
| infoStream.message( | ||
| HNSW_COMPONENT, | ||
| String.format( | ||
| Locale.ROOT, | ||
| "built %d in %d/%d ms", | ||
| node, | ||
| TimeUnit.NANOSECONDS.toMillis(now - t), | ||
| TimeUnit.NANOSECONDS.toMillis(now - start))); | ||
| return now; | ||
| HNSW_COMPONENT, String.format(Locale.ROOT, "built %d in %.2f ms", node, elapsedMs)); | ||
| } | ||
|
|
||
| void addDiverseNeighbors( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome! I wonder if, separately from this new
InfoStreamlog line, we could also aggregate these times and then (when HNSW merge is completely done, upstairs) add another summaryInfoStreamlog line stating the effective concurrency? Could maybe just be anAtomicLongthat each chunk increments its elapsed time into, then upstairs at HNSW merge end just divide that value by elapsed wall clock time to get &InfoStream.messagethe implied concurrency?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! Added a shared AtomicLong cumulativeWorkTimeNS that each chunk increments with its elapsed time. At the end of HnswConcurrentMergeBuilder.build(), after all workers finish, it logs:
merge completed: 100000 vectors, 12500.00 ms wall clock, 42300.45 ms cumulative worker time, 3.38x effective concurrencyThis should make it straightforward to detect the single threaded merge scenario, it would show ~1.0x effective concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay, this is awesome! Can't wait for nightly build to run on this then I scrutinize the
InfoStream-- first light for this new metric!