Skip to content

Commit 5d288c3

Browse files
authored
feat: store data file size in manifest (#3750)
Part of #3751. * When we write data files, we store the size in bytes in the manifest. * We use this size to skip the `HEAD` request needed to find the file footer byte range. * When writing a dataset, we can read any missing sizes and fill them in.
1 parent 17421b4 commit 5d288c3

27 files changed

Lines changed: 171 additions & 54 deletions

File tree

java/core/lance-jni/src/file_reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result<JOb
9292
let config = SchedulerConfig::max_bandwidth(&obj_store);
9393
let scan_scheduler = ScanScheduler::new(obj_store, config);
9494

95-
let file_scheduler = scan_scheduler.open_file(&Path::parse(&path)?).await?;
95+
let file_scheduler = scan_scheduler.open_file(&Path::parse(&path)?, None).await?;
9696
FileReader::try_open(
9797
file_scheduler,
9898
None,

java/core/lance-jni/src/fragment.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ fn create_fragment<'a>(
223223
}
224224

225225
const DATA_FILE_CLASS: &str = "com/lancedb/lance/fragment/DataFile";
226-
const DATA_FILE_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;[I[III)V";
226+
const DATA_FILE_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;[I[IIILjava/lang/Long;)V";
227227
const DELETE_FILE_CLASS: &str = "com/lancedb/lance/fragment/DeletionFile";
228228
const DELETE_FILE_CONSTRUCTOR_SIG: &str =
229229
"(JJLjava/lang/Long;Lcom/lancedb/lance/fragment/DeletionFileType;)V";
@@ -238,6 +238,10 @@ impl IntoJava for &DataFile {
238238
let path = env.new_string(self.path.clone())?.into();
239239
let fields = JLance(self.fields.clone()).into_java(env)?;
240240
let column_indices = JLance(self.column_indices.clone()).into_java(env)?;
241+
let file_size_bytes = match self.file_size_bytes {
242+
Some(f) => JLance(f as i64).into_java(env)?,
243+
None => JObject::null(),
244+
};
241245
Ok(env.new_object(
242246
DATA_FILE_CLASS,
243247
DATA_FILE_CONSTRUCTOR_SIG,
@@ -247,6 +251,7 @@ impl IntoJava for &DataFile {
247251
JValueGen::Object(&column_indices),
248252
JValueGen::Int(self.file_major_version as i32),
249253
JValueGen::Int(self.file_minor_version as i32),
254+
JValueGen::Object(&file_size_bytes),
250255
],
251256
)?)
252257
}
@@ -453,12 +458,17 @@ impl FromJObjectWithEnv<DataFile> for JObject<'_> {
453458
let file_minor_version = env
454459
.call_method(self, "getFileMinorVersion", "()I", &[])?
455460
.i()? as u32;
461+
let file_size_bytes: Option<i64> = env
462+
.call_method(self, "getFileSizeBytes", "()Ljava/lang/Long;", &[])?
463+
.l()?
464+
.extract_object(env)?;
456465
Ok(DataFile {
457466
path,
458467
fields,
459468
column_indices,
460469
file_major_version,
461470
file_minor_version,
471+
file_size_bytes: file_size_bytes.map(|r| r as u64),
462472
})
463473
}
464474
}

java/core/lance-jni/src/traits.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ impl IntoJava for JLance<usize> {
173173
}
174174
}
175175

176+
impl IntoJava for JLance<i64> {
177+
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
178+
Ok(env.new_object("java/lang/Long", "(J)V", &[JValueGen::Long(self.0)])?)
179+
}
180+
}
181+
176182
impl IntoJava for JLance<Option<usize>> {
177183
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
178184
let obj = match self.0 {

java/core/src/main/java/com/lancedb/lance/fragment/DataFile.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,21 @@ public class DataFile implements Serializable {
2424
private final int[] columnIndices;
2525
private final int fileMajorVersion;
2626
private final int fileMinorVersion;
27+
private final Long fileSizeBytes;
2728

2829
public DataFile(
29-
String path, int[] fields, int[] columnIndices, int fileMajorVersion, int fileMinorVersion) {
30+
String path,
31+
int[] fields,
32+
int[] columnIndices,
33+
int fileMajorVersion,
34+
int fileMinorVersion,
35+
Long fileSizeBytes) {
3036
this.path = path;
3137
this.fields = fields;
3238
this.columnIndices = columnIndices;
3339
this.fileMajorVersion = fileMajorVersion;
3440
this.fileMinorVersion = fileMinorVersion;
41+
this.fileSizeBytes = fileSizeBytes;
3542
}
3643

3744
public String getPath() {
@@ -54,6 +61,10 @@ public int getFileMinorVersion() {
5461
return fileMinorVersion;
5562
}
5663

64+
public Long getFileSizeBytes() {
65+
return fileSizeBytes;
66+
}
67+
5768
@Override
5869
public String toString() {
5970
return new ToStringBuilder(this)

protos/table.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,13 @@ message DataFile {
308308
// If both `file_major_version` and `file_minor_version` are set to 0,
309309
// then this is a version 0.1 or version 0.2 file.
310310
uint32 file_minor_version = 5;
311+
312+
// The known size of the file on disk in bytes.
313+
//
314+
// This is used to quickly find the footer of the file.
315+
//
316+
// When this is zero, it should be interpreted as "unknown".
317+
uint64 file_size_bytes = 6;
311318
} // DataFile
312319

313320
// Deletion File

python/python/lance/fragment.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,16 @@ class DataFile:
145145
The major version of the data storage format.
146146
file_minor_version : int
147147
The minor version of the data storage format.
148+
file_size_bytes : Optional[int]
149+
The size of the data file in bytes, if available.
148150
"""
149151

150152
_path: str
151153
fields: List[int]
152154
column_indices: List[int] = field(default_factory=list)
153155
file_major_version: int = 0
154156
file_minor_version: int = 0
157+
file_size_bytes: Optional[int] = None
155158

156159
def __init__(
157160
self,
@@ -160,21 +163,24 @@ def __init__(
160163
column_indices: List[int] = None,
161164
file_major_version: int = 0,
162165
file_minor_version: int = 0,
166+
file_size_bytes: Optional[int] = None,
163167
):
164168
# TODO: only we eliminate the path method, we can remove this
165169
self._path = path
166170
self.fields = fields
167171
self.column_indices = column_indices or []
168172
self.file_major_version = file_major_version
169173
self.file_minor_version = file_minor_version
174+
self.file_size_bytes = file_size_bytes
170175

171176
def __repr__(self):
172177
# pretend we have a 'path' attribute
173178
return (
174179
f"DataFile(path='{self._path}', fields={self.fields}, "
175180
f"column_indices={self.column_indices}, "
176181
f"file_major_version={self.file_major_version}, "
177-
f"file_minor_version={self.file_minor_version})"
182+
f"file_minor_version={self.file_minor_version}, "
183+
f"file_size_bytes={self.file_size_bytes})"
178184
)
179185

180186
@property

python/python/tests/test_fragment.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ def test_fragment_meta():
245245
data = {
246246
"id": 0,
247247
"files": [
248-
{"path": "0.lance", "fields": [0]},
248+
{"path": "0.lance", "fields": [0], "file_size_bytes": 100},
249249
{"path": "1.lance", "fields": [1]},
250250
],
251251
"deletion_file": None,
@@ -261,10 +261,10 @@ def test_fragment_meta():
261261

262262
assert repr(meta) == (
263263
"FragmentMetadata(id=0, files=[DataFile(path='0.lance', fields=[0], "
264-
"column_indices=[], file_major_version=0, file_minor_version=0), "
265-
"DataFile(path='1.lance', fields=[1], column_indices=[], "
266-
"file_major_version=0, file_minor_version=0)], physical_rows=100, "
267-
"deletion_file=None, row_id_meta=None)"
264+
"column_indices=[], file_major_version=0, file_minor_version=0, "
265+
"file_size_bytes=100), DataFile(path='1.lance', fields=[1], column_indices=[], "
266+
"file_major_version=0, file_minor_version=0, file_size_bytes=None)], "
267+
"physical_rows=100, deletion_file=None, row_id_meta=None)"
268268
)
269269

270270

python/src/file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ impl LanceFileReader {
398398
io_buffer_size_bytes: 2 * 1024 * 1024 * 1024,
399399
},
400400
);
401-
let file = scheduler.open_file(&path).await.infer_error()?;
401+
let file = scheduler.open_file(&path, None).await.infer_error()?;
402402
let inner = FileReader::try_open(
403403
file,
404404
None,

python/src/fragment.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ impl FromPyObject<'_> for PyLance<DataFile> {
658658
column_indices: ob.getattr("column_indices")?.extract()?,
659659
file_major_version: ob.getattr("file_major_version")?.extract()?,
660660
file_minor_version: ob.getattr("file_minor_version")?.extract()?,
661+
file_size_bytes: ob.getattr("file_size_bytes")?.extract()?,
661662
}))
662663
}
663664
}
@@ -679,6 +680,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&DataFile> {
679680
self.0.column_indices.clone(),
680681
self.0.file_major_version,
681682
self.0.file_minor_version,
683+
self.0.file_size_bytes,
682684
))
683685
}
684686
}

rust/lance-file/benches/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ fn bench_reader(c: &mut Criterion) {
6666
object_store.clone(),
6767
SchedulerConfig::default_for_testing(),
6868
);
69-
let scheduler = store_scheduler.open_file(file_path).await.unwrap();
69+
let scheduler = store_scheduler.open_file(file_path, None).await.unwrap();
7070
let reader = FileReader::try_open(
7171
scheduler.clone(),
7272
None,
@@ -159,7 +159,7 @@ fn bench_random_access(c: &mut Criterion) {
159159
let reader = rt.block_on(async move {
160160
let store_scheduler =
161161
ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
162-
let scheduler = store_scheduler.open_file(file_path).await.unwrap();
162+
let scheduler = store_scheduler.open_file(file_path, None).await.unwrap();
163163
Arc::new(
164164
FileReader::try_open(
165165
scheduler.clone(),

0 commit comments

Comments
 (0)