Skip to content

Commit 00819d0

Browse files
authored
Fixes and updates for file uploads (#42)
* vars.h5 attr fix * Increment version number to 0.4.1 * clean query * Update cytetype.R * Update cytetype.R * Update PrepareCyteTypeR.Rd * Update client.R * Update api.R * Update api.R * Update R-CMD-check.yaml * Update DESCRIPTION * man * Update api.R * Update artifacts.R * Update client.R * Update cytetype.R * Update seurat_helpers.R * test updates
1 parent a5142bf commit 00819d0

12 files changed

Lines changed: 616 additions & 163 deletions

File tree

.github/workflows/R-CMD-check.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# Workflow derived from https://github.com/r-lib/actions/tree/v2/examples
22
# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help
3+
34
on:
45
push:
56
branches: [main, master]
67
pull_request:
8+
types: [opened, synchronize, reopened, ready_for_review]
79

810
name: R-CMD-check.yaml
911

@@ -12,6 +14,7 @@ permissions: read-all
1214
jobs:
1315
R-CMD-check:
1416
runs-on: ${{ matrix.config.os }}
17+
if: github.event_name != 'pull_request' || github.event.pull_request.draft == false
1518

1619
name: ${{ matrix.config.os }} (${{ matrix.config.r }})
1720

@@ -27,7 +30,7 @@ jobs:
2730
env:
2831
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}
2932
R_KEEP_PKG_SOURCE: yes
30-
33+
3134
steps:
3235
- uses: actions/checkout@v4
3336

DESCRIPTION

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: CyteTypeR
22
Title: CyteType for R
3-
Version: 0.4.0
3+
Version: 0.4.1
44
Description: CyteTypeR is the R version of CyteType python package.
55
Authors@R:
66
person("Nygen Analytics AB", , ,"contact@nygen.io", role = c("aut", "cre"))
@@ -26,8 +26,10 @@ Imports:
2626
tidyr
2727
Suggests:
2828
duckdb,
29+
future,
2930
furrr,
3031
knitr,
32+
progressr,
3133
rhdf5filters,
3234
rmarkdown,
3335
testthat (>= 3.0.0)

R/api.R

Lines changed: 53 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,28 @@
1-
# API Response Helper for CyteType
1+
# HTTP primitives, request utilities, and shared constants for CyteType API communication.
2+
3+
# Upload size limits (numeric to avoid integer overflow)
4+
.MAX_UPLOAD_BYTES <- list(
5+
obs_duckdb = 2 * 1024 * 1024 * 1024, # 2 GB
6+
vars_h5 = 50 * 1024 * 1024 * 1024 # 50 GB
7+
)
8+
9+
# Chunked upload retry: delays (sec) after 1st, 2nd, 3rd failure; status codes treated as transient (incl. network/gateway)
10+
.CHUNK_UPLOAD_BACKOFF_SECS <- c(1L, 5L, 20L)
11+
.CHUNK_UPLOAD_TRANSIENT_STATUSES <- c(500L, 502L, 503L, 504L)
12+
13+
# URL path builder (avoids file.path backslashes on Windows)
14+
.url_path <- function(...) {
15+
x <- vapply(c(...), function(seg) gsub("^/+|/+$", "", as.character(seg)), character(1))
16+
paste(x[nzchar(x)], collapse = "/")
17+
}
18+
19+
.make_req <- function(base_url, path, auth_token) {
20+
req <- request(paste0(base_url, "/", path))
21+
if (!is.null(auth_token)) {
22+
req <- req_headers(req, Authorization = paste("Bearer", auth_token))
23+
}
24+
return(req)
25+
}
226

327
#' @importFrom httr2 req_auth_bearer_token req_body_json req_headers req_method req_perform req_timeout request resp_body_json resp_body_string resp_status
428
.api_response_helper <- function(job_id, api_url, req_item, auth_token = NULL) {
@@ -77,89 +101,36 @@
77101
})
78102
}
79103

80-
# Make Request for Job Results
81-
.make_results_request <- function(job_id, api_url, auth_token = NULL) {
82-
83-
# Helper for consistent responses
84-
make_response <- function(status, result = NULL, message, raw = NULL) {
85-
list(status = status, result = result, message = message, raw_response = raw)
104+
# PUT raw bytes to a presigned URL with retry, ETag validation, and proper error classification.
105+
.put_to_presigned_url <- function(presigned_url, chunk_data, timeout_seconds) {
106+
resp <- request(presigned_url) |>
107+
req_method("PUT") |>
108+
httr2::req_body_raw(chunk_data, type = "application/octet-stream") |>
109+
req_timeout(timeout_seconds) |>
110+
httr2::req_error(is_error = function(resp) FALSE) |>
111+
httr2::req_retry(
112+
max_tries = length(.CHUNK_UPLOAD_BACKOFF_SECS) + 1L,
113+
retry_on_failure = TRUE,
114+
is_transient = function(resp) resp_status(resp) %in% .CHUNK_UPLOAD_TRANSIENT_STATUSES,
115+
backoff = function(tries) .CHUNK_UPLOAD_BACKOFF_SECS[min(tries, length(.CHUNK_UPLOAD_BACKOFF_SECS))]
116+
) |>
117+
req_perform()
118+
119+
status <- resp_status(resp)
120+
if (status >= 400L) {
121+
stop(cytetype_api_error(
122+
message = paste0("Presigned upload rejected with HTTP ", status),
123+
call = "api"
124+
))
86125
}
87126

88-
tryCatch({
89-
# Get job status
90-
status_resp <- .api_response_helper(job_id, api_url, 'status', auth_token)
91-
92-
# Handle 404 immediately
93-
if (status_resp$status_code == 404) {
94-
return(make_response("not_found", message = "Job not found"))
95-
}
96-
97-
job_status <- status_resp$data$jobStatus
98-
status_data <- status_resp$data
99-
100-
# Process based on job status
101-
if (job_status == "completed") {
102-
# Try to get results
103-
results_resp <- tryCatch(
104-
.api_response_helper(job_id, api_url, 'results', auth_token),
105-
error = function(e) {
106-
make_response(
107-
"failed",
108-
message = paste("Job completed but results unavailable:", e$message),
109-
raw = status_data
110-
)
111-
}
112-
)
113-
114-
if (!is.null(results_resp$status) && results_resp$status == "failed") {
115-
return(results_resp)
116-
}
117-
118-
if (results_resp$status_code == 404) {
119-
return(make_response(
120-
"failed",
121-
message = "Job completed but results are unavailable",
122-
raw = status_data
123-
))
124-
}
125-
126-
return(make_response(
127-
"completed",
128-
result = results_resp$data,
129-
message = "Job completed successfully",
130-
raw = status_data
131-
))
132-
}
133-
134-
if (job_status == "failed") {
135-
return(make_response(
136-
"failed",
137-
message = "Job failed",
138-
raw = status_data
139-
))
140-
}
141-
142-
if (job_status %in% c("processing", "pending")) {
143-
return(make_response(
144-
job_status,
145-
message = paste("Job is", job_status),
146-
raw = status_data
147-
))
148-
}
149-
150-
# Unknown status
151-
return(make_response(
152-
"unknown",
153-
message = paste("Unknown job status:", job_status),
154-
raw = status_data
127+
etag <- httr2::resp_header(resp, "ETag")
128+
if (is.null(etag) || !nzchar(etag)) {
129+
stop(cytetype_api_error(
130+
message = "Presigned PUT succeeded but response is missing ETag header",
131+
call = "network"
155132
))
133+
}
156134

157-
}, error = function(e) {
158-
# Handle any unexpected errors
159-
return(make_response(
160-
"error",
161-
message = paste("Error checking job status:", e$message)
162-
))
163-
})
135+
etag
164136
}
165-

R/artifacts.R

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,33 +64,35 @@
6464
rhdf5::h5writeDataset(.as_string_values(vec), h5loc = fid, name = col_path)
6565
}
6666
did <- rhdf5::H5Dopen(fid, col_path)
67-
rhdf5::h5writeAttribute(col_name, h5obj = did, name = "source_name")
68-
rhdf5::h5writeAttribute(source_dtype, h5obj = did, name = "source_dtype")
67+
rhdf5::h5writeAttribute(col_name, h5obj = did, name = "source_name", asScalar = TRUE)
68+
rhdf5::h5writeAttribute(source_dtype, h5obj = did, name = "source_dtype", asScalar = TRUE)
6969
rhdf5::H5Dclose(did)
7070
}
7171
invisible(NULL)
7272
}
7373

7474
# Write a sparse matrix under a named HDF5 group.
7575
# csr = FALSE (default): CSC — indptr over columns (genes), indices are row (cell) indices.
76-
# Input m must be cells × genes (n_obs × n_vars).
76+
# Input m must be cells x genes (n_obs x n_vars).
7777
# csr = TRUE: CSR — indptr over rows (cells), indices are column (gene) indices.
78-
# Input m must be genes × cells (n_vars × n_obs) as returned by Seurat GetAssayData.
79-
# Stored via CSC(genes × cells) CSR(cells × genes); no transpose needed.
78+
# Input m must be genes x cells (n_vars x n_obs) as returned by Seurat GetAssayData.
79+
# Stored via CSC(genes x cells) == CSR(cells x genes); no transpose needed.
8080
.write_sparse_group <- function(fid, group, m, n_obs, col_batch, chunk_size,
81-
csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT") {
81+
csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT",
82+
pb_id = NULL) {
8283
if (csr) {
8384
m <- as(m, "CsparseMatrix")
8485
n_vars <- nrow(m)
8586
} else {
8687
n_vars <- ncol(m)
8788
}
8889
n_cols <- ncol(m)
90+
8991
rhdf5::h5createGroup(fid, group)
9092
gid <- rhdf5::H5Gopen(fid, group)
9193
on.exit(rhdf5::H5Gclose(gid), add = TRUE)
92-
rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = gid, name = "n_obs")
93-
rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = gid, name = "n_vars")
94+
rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = gid, name = "n_obs", asScalar = TRUE)
95+
rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = gid, name = "n_vars", asScalar = TRUE)
9496

9597
rhdf5::h5createDataset(fid, paste0(group, "/indices"), dims = 0L,
9698
maxdims = rhdf5::H5Sunlimited(), chunk = chunk_size,
@@ -107,7 +109,7 @@
107109
chunk <- as(m[, start:end, drop = FALSE], "CsparseMatrix")
108110
chunk_indices <- as.integer(chunk@i)
109111
chunk_data <- if (data_h5type == "H5T_NATIVE_INT32") as.integer(chunk@x) else as.numeric(chunk@x)
110-
chunk_nnz <- length(chunk_indices)
112+
chunk_nnz <- length(chunk_indices)
111113
if (chunk_nnz > 0L) {
112114
rhdf5::h5set_extent(fid, paste0(group, "/indices"), current_size + chunk_nnz)
113115
rhdf5::h5writeDataset(chunk_indices, h5loc = fid, name = paste0(group, "/indices"),
@@ -119,6 +121,7 @@
119121
}
120122
new_indptr <- as.numeric(chunk@p[-1L]) + indptr[length(indptr)]
121123
indptr <- c(indptr, new_indptr)
124+
if (!is.null(pb_id)) cli::cli_progress_update(id = pb_id)
122125
}
123126

124127
rhdf5::h5createDataset(fid, paste0(group, "/indptr"), dims = length(indptr),
@@ -142,6 +145,21 @@
142145
}
143146
chunk_size <- max(1L, min(n_obs * 10L, min_chunk_size))
144147

148+
raw_col_batch <- if (!is.null(raw_mat)) {
149+
max(1L, as.integer(100000000 / max(nrow(raw_mat), 1)))
150+
} else NULL
151+
152+
vars_n_batches <- length(seq(1L, n_vars, by = col_batch))
153+
raw_n_batches <- if (!is.null(raw_mat)) length(seq(1L, ncol(raw_mat), by = raw_col_batch)) else 0L
154+
total_batches <- vars_n_batches + raw_n_batches
155+
156+
pb_label <- if (raw_n_batches > 0L) "Writing vars.h5 (normalized + raw)" else "Writing vars.h5"
157+
pb_id <- cli::cli_progress_bar(
158+
format = paste0(pb_label, " {cli::pb_bar} {cli::pb_current}/{cli::pb_total} batches ({cli::pb_rate})"),
159+
total = total_batches,
160+
clear = FALSE
161+
)
162+
145163
if (file.exists(out_file) && !file.remove(out_file)) {
146164
stop("Could not remove existing file: ", out_file)
147165
}
@@ -150,16 +168,16 @@
150168
fid <- rhdf5::H5Fopen(out_file, flags = "H5F_ACC_RDWR")
151169
on.exit(rhdf5::H5Fclose(fid), add = TRUE)
152170

153-
.write_sparse_group(fid, "vars", mat, n_obs, col_batch, chunk_size)
171+
.write_sparse_group(fid, "vars", mat, n_obs, col_batch, chunk_size, pb_id = pb_id)
154172

155173
if (!is.null(raw_mat)) {
156-
raw_col_batch <- max(1L, as.integer(100000000 / max(nrow(raw_mat), 1)))
157174
.write_sparse_group(fid, "raw", raw_mat, n_obs, raw_col_batch, chunk_size,
158-
csr = TRUE, data_h5type = "H5T_NATIVE_INT32")
175+
csr = TRUE, data_h5type = "H5T_NATIVE_INT32", pb_id = pb_id)
159176
}
160177

161178
if (!is.null(feature_df)) {
162-
.write_var_metadata(fid, n_cols = n_vars, feature_df = feature_df, feature_names = feature_names)
179+
.write_var_metadata(fid, n_cols = n_vars, feature_df = feature_df,
180+
feature_names = feature_names)
163181
}
164182

165183
invisible(out_file)
@@ -187,6 +205,8 @@
187205
}
188206
}
189207

208+
cli::cli_progress_step("Writing obs.duckdb")
209+
190210
if (file.exists(out_file)) file.remove(out_file)
191211
config <- list(threads = as.character(threads), memory_limit = memory_limit, temp_directory = temp_directory)
192212
con <- duckdb::dbConnect(duckdb::duckdb(), out_file, config = config)

0 commit comments

Comments
 (0)