Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@internxt/inxt-js",
"author": "Internxt <hello@internxt.com>",
"version": "3.2.0",
"version": "3.2.1",
"description": "",
"main": "build/index.js",
"types": "build/index.d.ts",
Expand Down
11 changes: 0 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ type GetBucketsCallback = (err: Error | null, result: any) => void;

type GetBucketIdCallback = (err: Error | null, result: any) => void;

// eslint-disable-next-line @typescript-eslint/no-unused-vars
type DeleteBucketCallback = (err: Error | null, result: any) => void;

type ListFilesCallback = (err: Error | null, result: any) => void;

type DeleteFileCallback = (err: Error | null, result: any) => void;
Expand Down Expand Up @@ -146,10 +143,6 @@ export class Environment {
cb(Error('Not implemented yet'), null);
}

setEncryptionKey(newEncryptionKey: string): void {
this.config.encryptionKey = newEncryptionKey;
}

upload: UploadStrategyFunction = async (bucketId: string, opts: UploadOptions) => {
if (!this.config.encryptionKey) {
throw Error('Mnemonic was not provided, please, provide a mnemonic');
Expand Down Expand Up @@ -267,10 +260,6 @@ export class Environment {
state.stop();
}

uploadCancel(state: ActionState): void {
state.stop();
}

renameFile(bucketId: string, fileId: string, newPlainName: string): Promise<void> {
const mnemonic: string | undefined = this.config.encryptionKey;

Expand Down
79 changes: 2 additions & 77 deletions src/lib/INXTRequest.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { ClientRequest } from 'http';
import { EventEmitter } from 'events';
import { Readable } from 'stream';
import axios, { AxiosRequestConfig, AxiosResponse, Canceler } from 'axios';
import axios, { AxiosRequestConfig } from 'axios';

import { request, streamRequest } from '../services/request';
import { ProxyManager, getProxy } from '../services/proxy';
import { request } from '../services/request';
import { EnvironmentConfig } from '../api';

export enum Methods {
Expand All @@ -18,19 +16,12 @@ export enum Methods {
export class INXTRequest extends EventEmitter {
private req: Promise<any> | ClientRequest | undefined;
private config: EnvironmentConfig;
private cancel: Canceler;
private useProxy: boolean;
private streaming = false;

method: Methods;
targetUrl: string;
params: AxiosRequestConfig;

static Events = {
UploadProgress: 'upload-progress',
DownloadProgress: 'download-progress',
};

constructor(
config: EnvironmentConfig,
method: Methods,
Expand All @@ -45,14 +36,11 @@ export class INXTRequest extends EventEmitter {
this.targetUrl = targetUrl;
this.useProxy = useProxy ?? false;
this.params = params;

this.cancel = () => null;
}

start<K>(): Promise<K> {
// TODO: Abstract from axios
const source = axios.CancelToken.source();
this.cancel = source.cancel;

const cancelToken = source.token;

Expand All @@ -66,67 +54,4 @@ export class INXTRequest extends EventEmitter {

return this.req;
}

async stream<K>(content: Readable, size: number): Promise<AxiosResponse<K>>;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async stream<K>(): Promise<Readable>;
async stream<K>(content?: any, size?: number): Promise<any> {
if (size) {
return this.postStream<K>(content, size);
}

return this.getStream();
}

private async getStream(): Promise<Readable> {
this.streaming = true;

let proxy: ProxyManager | undefined;

if (this.useProxy) {
proxy = await getProxy();
}

const targetUrl = `${proxy && proxy.url ? proxy.url + '/' : ''}${this.targetUrl}`;

return streamRequest(targetUrl);
}

private async postStream<K>(content: Readable, size: number): Promise<K> {
this.streaming = true;

let proxy: ProxyManager | undefined;

if (this.useProxy) {
proxy = await getProxy();
}

const targetUrl = `${proxy && proxy.url ? proxy.url + '/' : ''}${this.targetUrl}`;

return axios
.post<K>(targetUrl, content, {
maxContentLength: Infinity,
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': size.toString(),
},
})
.then((res) => {
proxy?.free();

return res as unknown as K;
});
}

abort() {
if (this.streaming && this.req instanceof ClientRequest) {
return this.req.destroy();
}

this.cancel();
}

isCancelled(err: Error): boolean {
return axios.isCancel(err);
}
}
17 changes: 6 additions & 11 deletions src/lib/core/upload/multipart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,14 @@ export async function uploadParts(
let partChunks: Buffer[] = [];
let uploadPartError: Error | null = null;

const uploadQueue = queue(async (part: PartUpload, callback) => {
const uploadQueue = queue(async (part: PartUpload) => {
logger.debug('Uploading part %s of %s => %s bytes', part.source.index, partUrls.length, part.source.size);
try {
const etag = await uploadPart(part.url, part.source, signal);

if (!etag) {
throw new Error('ETag header was not returned');
}
parts.push({ PartNumber: part.source.index, ETag: etag });
callback();
} catch (err) {
callback(err as Error);
const etag = await uploadPart(part.url, part.source, signal);

if (!etag) {
throw new Error('ETag header was not returned');
}
parts.push({ PartNumber: part.source.index, ETag: etag });
}, concurrency);

uploadQueue.error((err) => {
Expand Down
57 changes: 0 additions & 57 deletions src/services/request.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import * as url from 'url';
import * as https from 'https';
import * as http from 'http';
import { Readable } from 'stream';
import { ClientRequest, IncomingMessage } from 'http';
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios';

import { EnvironmentConfig } from '../api';
Expand Down Expand Up @@ -45,61 +43,6 @@ export async function request(
});
}

export function streamRequest(targetUrl: string, timeoutSeconds?: number): Readable {
const uriParts = url.parse(targetUrl);
let downloader: ClientRequest | null = null;

function _createDownloadStream(): ClientRequest {
const requestOpts = {
protocol: uriParts.protocol,
hostname: uriParts.hostname,
port: uriParts.port,
path: uriParts.path,
headers: {
Accept: 'application/octet-stream',
},
};

return uriParts.protocol === 'http:' ? http.get(requestOpts) : https.get(requestOpts);
}

return new Readable({
read() {
if (!downloader) {
downloader = _createDownloadStream();

if (timeoutSeconds) {
downloader.setTimeout(timeoutSeconds * 1000, () => {
downloader?.destroy(Error(`Request timeouted after ${timeoutSeconds} seconds`));
});
}

this.once('signal', (message: string) => {
if (message === 'Destroy request') {
downloader?.destroy();
}

this.destroy();
});

downloader
.on('response', (res: IncomingMessage) => {
res
.on('data', this.push.bind(this))
.on('error', this.emit.bind(this, 'error'))
.on('end', () => {
this.push.bind(this, null);
this.emit('end');
})
.on('close', this.emit.bind(this, 'close'));
})
.on('error', this.emit.bind(this, 'error'))
.on('timeout', () => this.emit('error', Error('Request timeout')));
}
},
});
}

export async function getStream(url: string, config = { useProxy: false }): Promise<Readable> {
let targetUrl = url;
let free: undefined | (() => void);
Expand Down
3 changes: 0 additions & 3 deletions vitest.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,4 @@ export default defineConfig({
},
restoreMocks: true,
},
oxc: {
target: 'es2015',
},
});
Loading