Skip to content

Commit 7257a24

Browse files
committed
Update to use object_store 0.13 in hdfs.rs.
1 parent 0de1381 commit 7257a24

2 files changed

Lines changed: 42 additions & 135 deletions

File tree

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,7 @@ mod tests {
583583

584584
/// Helper function to create a test RecordBatch with 1000 rows of (int, string) data
585585
/// Example batch_id 1 -> 0..1000, 2 -> 1001..2000
586+
#[allow(dead_code)]
586587
fn create_test_record_batch(batch_id: i32) -> Result<RecordBatch> {
587588
assert!(batch_id > 0, "batch_id must be greater than 0");
588589
let num_rows = batch_id * 1000;

native/hdfs/src/object_store/hdfs.rs

Lines changed: 41 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ use fs_hdfs::walkdir::HdfsWalkDir;
3131
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
3232
use object_store::{
3333
path::{self, Path},
34-
Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
35-
ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
34+
CopyMode, CopyOptions, Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
35+
MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload,
36+
PutResult, Result,
3637
};
3738

3839
/// scheme for HDFS File System
@@ -144,62 +145,6 @@ impl ObjectStore for HadoopFileSystem {
144145
unimplemented!()
145146
}
146147

147-
async fn get(&self, location: &Path) -> Result<GetResult> {
148-
let hdfs = self.hdfs.clone();
149-
let hdfs_root = self.hdfs.url().to_owned();
150-
let location = HadoopFileSystem::path_to_filesystem(location);
151-
152-
let (blob, object_metadata, range) = maybe_spawn_blocking(move || {
153-
let file = hdfs.open(&location).map_err(to_error)?;
154-
155-
let file_status = file.get_file_status().map_err(to_error)?;
156-
157-
let to_read = file_status.len();
158-
let mut total_read = 0;
159-
let mut buf = vec![0; to_read];
160-
while total_read < to_read {
161-
let read = file.read(buf.as_mut_slice()).map_err(to_error)?;
162-
if read <= 0 {
163-
break;
164-
}
165-
total_read += read as usize;
166-
}
167-
168-
if total_read != to_read {
169-
return Err(Error::Generic {
170-
store: "HadoopFileSystem",
171-
source: Box::new(HdfsErr::Generic(format!(
172-
"Error reading path {} with expected size {} and actual size {}",
173-
file.path(),
174-
to_read,
175-
total_read
176-
))),
177-
});
178-
}
179-
180-
file.close().map_err(to_error)?;
181-
182-
let object_metadata = convert_metadata(file_status.clone(), &hdfs_root);
183-
184-
let range = Range {
185-
start: 0,
186-
end: file_status.len() as u64,
187-
};
188-
189-
Ok((buf.into(), object_metadata, range))
190-
})
191-
.await?;
192-
193-
Ok(GetResult {
194-
payload: GetResultPayload::Stream(
195-
futures::stream::once(async move { Ok(blob) }).boxed(),
196-
),
197-
meta: object_metadata,
198-
range,
199-
attributes: Default::default(),
200-
})
201-
}
202-
203148
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
204149
if options.if_match.is_some() || options.if_none_match.is_some() {
205150
return Err(Error::Generic {
@@ -249,51 +194,40 @@ impl ObjectStore for HadoopFileSystem {
249194
})
250195
}
251196

252-
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
197+
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
253198
let hdfs = self.hdfs.clone();
254199
let location = HadoopFileSystem::path_to_filesystem(location);
200+
let ranges = ranges.to_vec();
255201

256202
maybe_spawn_blocking(move || {
257203
let file = hdfs.open(&location).map_err(to_error)?;
258-
let buf = Self::read_range(&range, &file)?;
204+
let result = ranges
205+
.iter()
206+
.map(|range| Self::read_range(range, &file))
207+
.collect::<Result<Vec<_>>>()?;
259208
file.close().map_err(to_error)?;
260-
261-
Ok(buf)
209+
Ok(result)
262210
})
263211
.await
264212
}
265213

266-
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
267-
coalesce_ranges(
268-
ranges,
269-
|range| self.get_range(location, range),
270-
HDFS_COALESCE_DEFAULT,
271-
)
272-
.await
273-
}
274-
275-
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
276-
let hdfs = self.hdfs.clone();
277-
let hdfs_root = self.hdfs.url().to_owned();
278-
let location = HadoopFileSystem::path_to_filesystem(location);
279-
280-
maybe_spawn_blocking(move || {
281-
let file_status = hdfs.get_file_status(&location).map_err(to_error)?;
282-
Ok(convert_metadata(file_status, &hdfs_root))
283-
})
284-
.await
285-
}
286-
287-
async fn delete(&self, location: &Path) -> Result<()> {
214+
fn delete_stream(
215+
&self,
216+
locations: BoxStream<'static, Result<Path>>,
217+
) -> BoxStream<'static, Result<Path>> {
288218
let hdfs = self.hdfs.clone();
289-
let location = HadoopFileSystem::path_to_filesystem(location);
290-
291-
maybe_spawn_blocking(move || {
292-
hdfs.delete(&location, false).map_err(to_error)?;
293-
294-
Ok(())
295-
})
296-
.await
219+
locations
220+
.map(move |location| {
221+
let hdfs = hdfs.clone();
222+
maybe_spawn_blocking(move || {
223+
let location = location?;
224+
let fs_path = HadoopFileSystem::path_to_filesystem(&location);
225+
hdfs.delete(&fs_path, false).map_err(to_error)?;
226+
Ok(location)
227+
})
228+
})
229+
.buffered(10)
230+
.boxed()
297231
}
298232

299233
/// List all of the leaf files under the prefix path.
@@ -402,61 +336,33 @@ impl ObjectStore for HadoopFileSystem {
402336
.await
403337
}
404338

405-
/// Copy an object from one path to another.
406-
/// If there exists an object at the destination, it will be overwritten.
407-
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
339+
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
408340
let hdfs = self.hdfs.clone();
409341
let from = HadoopFileSystem::path_to_filesystem(from);
410342
let to = HadoopFileSystem::path_to_filesystem(to);
411343

412344
maybe_spawn_blocking(move || {
413-
// We need to make sure the source exist
414345
if !hdfs.exist(&from) {
415346
return Err(Error::NotFound {
416347
path: from.clone(),
417348
source: Box::new(HdfsErr::FileNotFound(from)),
418349
});
419350
}
420-
// Delete destination if exists
421-
if hdfs.exist(&to) {
422-
hdfs.delete(&to, false).map_err(to_error)?;
423-
}
424-
425-
fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to)
426-
.map_err(to_error)?;
427-
428-
Ok(())
429-
})
430-
.await
431-
}
432-
433-
/// It's only allowed for the same HDFS
434-
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
435-
let hdfs = self.hdfs.clone();
436-
let from = HadoopFileSystem::path_to_filesystem(from);
437-
let to = HadoopFileSystem::path_to_filesystem(to);
438-
439-
maybe_spawn_blocking(move || {
440-
hdfs.rename(&from, &to, true).map_err(to_error)?;
441-
442-
Ok(())
443-
})
444-
.await
445-
}
446-
447-
/// Copy an object from one path to another, only if destination is empty.
448-
/// Will return an error if the destination already has an object.
449-
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
450-
let hdfs = self.hdfs.clone();
451-
let from = HadoopFileSystem::path_to_filesystem(from);
452-
let to = HadoopFileSystem::path_to_filesystem(to);
453351

454-
maybe_spawn_blocking(move || {
455-
if hdfs.exist(&to) {
456-
return Err(Error::AlreadyExists {
457-
path: from,
458-
source: Box::new(HdfsErr::FileAlreadyExists(to)),
459-
});
352+
match options.mode {
353+
CopyMode::Overwrite => {
354+
if hdfs.exist(&to) {
355+
hdfs.delete(&to, false).map_err(to_error)?;
356+
}
357+
}
358+
CopyMode::Create => {
359+
if hdfs.exist(&to) {
360+
return Err(Error::AlreadyExists {
361+
path: from,
362+
source: Box::new(HdfsErr::FileAlreadyExists(to)),
363+
});
364+
}
365+
}
460366
}
461367

462368
fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to)

0 commit comments

Comments
 (0)