Skip to content

Commit dd4c7eb

Browse files
committed
Add fast-lane apply for small sync payloads
1 parent 90b0cd1 commit dd4c7eb

1 file changed

Lines changed: 128 additions & 33 deletions

File tree

src/network/network.c

Lines changed: 128 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ static size_t cacert_len = sizeof(cacert_pem) - 1;
5656
#ifndef CLOUDSYNC_CURL_MAXLIFETIME_CONN_SECONDS
5757
#define CLOUDSYNC_CURL_MAXLIFETIME_CONN_SECONDS 60L
5858
#endif
59+
#ifndef CLOUDSYNC_NETWORK_FAST_LANE_MAX_BLOB_SIZE
60+
#define CLOUDSYNC_NETWORK_FAST_LANE_MAX_BLOB_SIZE (32 * 1024)
61+
#endif
5962

6063
#define DEFAULT_SYNC_WAIT_MS 100
6164
#define DEFAULT_SYNC_MAX_RETRIES 1
@@ -1215,6 +1218,63 @@ static char *json_extract_failure_stage(const char *json, size_t json_len, const
12151218
return stage;
12161219
}
12171220

1221+
static char *network_base64_encode(const unsigned char *src, size_t len) {
1222+
static const char table[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1223+
1224+
if (!src && len > 0) return NULL;
1225+
if (len > (SIZE_MAX - 1) / 4 * 3) return NULL;
1226+
1227+
size_t out_len = 4 * ((len + 2) / 3);
1228+
char *out = cloudsync_memory_alloc((uint64_t)out_len + 1);
1229+
if (!out) return NULL;
1230+
1231+
size_t i = 0;
1232+
size_t j = 0;
1233+
while (i < len) {
1234+
uint32_t octet_a = i < len ? src[i++] : 0;
1235+
uint32_t octet_b = i < len ? src[i++] : 0;
1236+
uint32_t octet_c = i < len ? src[i++] : 0;
1237+
uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;
1238+
1239+
out[j++] = table[(triple >> 18) & 0x3f];
1240+
out[j++] = table[(triple >> 12) & 0x3f];
1241+
out[j++] = table[(triple >> 6) & 0x3f];
1242+
out[j++] = table[triple & 0x3f];
1243+
}
1244+
1245+
if (len % 3 == 1) {
1246+
out[out_len - 1] = '=';
1247+
out[out_len - 2] = '=';
1248+
} else if (len % 3 == 2) {
1249+
out[out_len - 1] = '=';
1250+
}
1251+
1252+
out[out_len] = '\0';
1253+
return out;
1254+
}
1255+
1256+
static char *network_apply_json_payload(const char *transport_key, const char *transport_value,
1257+
int db_version_min, int db_version_max) {
1258+
if (!transport_key || !transport_value) return NULL;
1259+
1260+
char *escaped_value = json_escape_string(transport_value);
1261+
if (!escaped_value) return NULL;
1262+
1263+
size_t requested = strlen(transport_key) + strlen(escaped_value) + 128;
1264+
char *json_payload = cloudsync_memory_alloc((uint64_t)requested);
1265+
if (!json_payload) {
1266+
cloudsync_memory_free(escaped_value);
1267+
return NULL;
1268+
}
1269+
1270+
snprintf(json_payload, requested,
1271+
"{\"%s\":\"%s\", \"dbVersionMin\":%d, \"dbVersionMax\":%d}",
1272+
transport_key, escaped_value, db_version_min, db_version_max);
1273+
1274+
cloudsync_memory_free(escaped_value);
1275+
return json_payload;
1276+
}
1277+
12181278
static const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed,
12191279
int gaps_size, int64_t local_version) {
12201280
if (last_optimistic < 0 || last_confirmed < 0) return "error";
@@ -1294,43 +1354,78 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc,
12941354

12951355
NETWORK_RESULT res;
12961356
if (blob != NULL && blob_size > 0) {
1297-
// there is data to send
1298-
res = network_receive_buffer(netdata, netdata->upload_endpoint, netdata->authentication, true, false, NULL, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers));
1299-
if (res.code != CLOUDSYNC_NETWORK_BUFFER) {
1357+
int db_version_min = db_version+1;
1358+
int db_version_max = (int)new_db_version;
1359+
if (db_version_min > db_version_max) db_version_min = db_version_max;
1360+
1361+
#ifdef CLOUDSYNC_NETWORK_TRACE
1362+
fprintf(stderr,
1363+
"[cloudsync-network] send_changes blob_size=%d fast-lane:%s\n",
1364+
blob_size,
1365+
blob_size <= CLOUDSYNC_NETWORK_FAST_LANE_MAX_BLOB_SIZE ? "true" : "false");
1366+
#endif
1367+
1368+
if (blob_size <= CLOUDSYNC_NETWORK_FAST_LANE_MAX_BLOB_SIZE) {
1369+
char *blob_base64 = network_base64_encode((const unsigned char *)blob, (size_t)blob_size);
13001370
cloudsync_memory_free(blob);
1301-
network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to receive upload URL");
1302-
network_result_cleanup(&res);
1303-
return SQLITE_ERROR;
1304-
}
1305-
1306-
char *s3_url = json_extract_string(res.buffer, res.blen, "url");
1307-
if (!s3_url) {
1371+
if (!blob_base64) {
1372+
sqlite3_result_error(context, "cloudsync_network_send_changes: unable to encode BLOB changes.", -1);
1373+
sqlite3_result_error_code(context, SQLITE_NOMEM);
1374+
return SQLITE_NOMEM;
1375+
}
1376+
1377+
char *json_payload = network_apply_json_payload("blob", blob_base64, db_version_min, db_version_max);
1378+
cloudsync_memory_free(blob_base64);
1379+
if (!json_payload) {
1380+
sqlite3_result_error(context, "cloudsync_network_send_changes: unable to allocate apply request payload.", -1);
1381+
sqlite3_result_error_code(context, SQLITE_NOMEM);
1382+
return SQLITE_NOMEM;
1383+
}
1384+
1385+
res = network_receive_buffer(netdata, netdata->apply_endpoint, netdata->authentication, true, true, json_payload, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers));
1386+
cloudsync_memory_free(json_payload);
1387+
} else {
1388+
// bulk lane: stage the payload through the upload endpoint and apply by URL
1389+
res = network_receive_buffer(netdata, netdata->upload_endpoint, netdata->authentication, true, false, NULL, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers));
1390+
if (res.code != CLOUDSYNC_NETWORK_BUFFER) {
1391+
cloudsync_memory_free(blob);
1392+
network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to receive upload URL");
1393+
network_result_cleanup(&res);
1394+
return SQLITE_ERROR;
1395+
}
1396+
1397+
char *s3_url = json_extract_string(res.buffer, res.blen, "url");
1398+
if (!s3_url) {
1399+
cloudsync_memory_free(blob);
1400+
sqlite3_result_error(context, "cloudsync_network_send_changes: missing 'url' in upload response.", -1);
1401+
network_result_cleanup(&res);
1402+
return SQLITE_ERROR;
1403+
}
1404+
bool sent = network_send_buffer(netdata, s3_url, NULL, blob, blob_size);
13081405
cloudsync_memory_free(blob);
1309-
sqlite3_result_error(context, "cloudsync_network_send_changes: missing 'url' in upload response.", -1);
1310-
network_result_cleanup(&res);
1311-
return SQLITE_ERROR;
1312-
}
1313-
bool sent = network_send_buffer(netdata, s3_url, NULL, blob, blob_size);
1314-
cloudsync_memory_free(blob);
1315-
if (sent == false) {
1406+
if (sent == false) {
1407+
cloudsync_memory_free(s3_url);
1408+
network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to upload BLOB changes to remote host.");
1409+
network_result_cleanup(&res);
1410+
return SQLITE_ERROR;
1411+
}
1412+
1413+
char *json_payload = network_apply_json_payload("url", s3_url, db_version_min, db_version_max);
13161414
cloudsync_memory_free(s3_url);
1317-
network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to upload BLOB changes to remote host.");
1415+
if (!json_payload) {
1416+
sqlite3_result_error(context, "cloudsync_network_send_changes: unable to allocate apply request payload.", -1);
1417+
sqlite3_result_error_code(context, SQLITE_NOMEM);
1418+
network_result_cleanup(&res);
1419+
return SQLITE_NOMEM;
1420+
}
1421+
1422+
// free res
13181423
network_result_cleanup(&res);
1319-
return SQLITE_ERROR;
1424+
1425+
// notify remote host that we successfully uploaded changes
1426+
res = network_receive_buffer(netdata, netdata->apply_endpoint, netdata->authentication, true, true, json_payload, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers));
1427+
cloudsync_memory_free(json_payload);
13201428
}
1321-
1322-
int db_version_min = db_version+1;
1323-
int db_version_max = (int)new_db_version;
1324-
if (db_version_min > db_version_max) db_version_min = db_version_max;
1325-
char json_payload[4096];
1326-
snprintf(json_payload, sizeof(json_payload), "{\"url\":\"%s\", \"dbVersionMin\":%d, \"dbVersionMax\":%d}", s3_url, db_version_min, db_version_max);
1327-
cloudsync_memory_free(s3_url);
1328-
1329-
// free res
1330-
network_result_cleanup(&res);
1331-
1332-
// notify remote host that we succesfully uploaded changes
1333-
res = network_receive_buffer(netdata, netdata->apply_endpoint, netdata->authentication, true, true, json_payload, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers));
13341429
} else {
13351430
// there is no data to send, just check the status to update the db_version value in settings and to reply the status
13361431
new_db_version = db_version;
@@ -1351,7 +1446,7 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc,
13511446
apply_failure_json = json_extract_failure_stage(res.buffer, res.blen, "apply");
13521447
check_failure_json = json_extract_failure_stage(res.buffer, res.blen, "check");
13531448
} else if (res.code != CLOUDSYNC_NETWORK_OK) {
1354-
network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to notify BLOB upload to remote host.");
1449+
network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to apply changes to remote host.");
13551450
network_result_cleanup(&res);
13561451
return SQLITE_ERROR;
13571452
}

0 commit comments

Comments
 (0)