Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions sdk/packages/flight-sql-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use napi_derive::{module_exports, napi};
use snafu::prelude::*;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tracing::info;
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;

use crate::conversion::record_batches_to_buffer;
Expand All @@ -29,14 +29,19 @@ use crate::flight_client::{execute_flight, setup_client, ClientOptions};

fn init_logging() {
// Set up a subscriber that logs to stdout
let subscriber = tracing_subscriber::FmtSubscriber::builder()
if let Err(e) = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.finish();

// Set the global default subscriber
tracing::subscriber::set_global_default(subscriber).expect("Failed to set tracing subscriber");
Comment thread
stbrody marked this conversation as resolved.
.try_init()
{
let error_msg = e.to_string();
if error_msg.contains("already been set") {
warn!("Tracing already initialized");
} else {
panic!("Failed to initialize tracing: {}", e);
}
}
}

#[module_exports]
Expand Down
47 changes: 47 additions & 0 deletions tests/suite/src/__tests__/correctness/fast/flight-sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,53 @@ describe('flight sql', () => {
authenticatedDID = await randomDID()
}, 20000)

test('concurrent initialization across threads', async () => {
const { Worker } = await import('worker_threads');

interface WorkerMessage {
success: boolean;
error?: string;
}

// Create workers that will all try to load the module
const createWorker = (): Promise<WorkerMessage> => new Promise((resolve, reject) => {
const worker = new Worker(`
import { createFlightSqlClient } from '@ceramic-sdk/flight-sql-client';
import { parentPort } from 'worker_threads';
const OPTIONS = {
headers: new Array(),
username: undefined,
password: undefined,
token: undefined,
tls: false,
host: "",
port: 0,
}
try {
const client = createFlightSqlClient(OPTIONS);
parentPort.postMessage({ success: true });
} catch (error) {
parentPort.postMessage({ success: false, error: error.message });
}
`, {
eval: true,
});

worker.on('message', (data) => resolve(data as WorkerMessage));
worker.on('error', reject);
});

const results = await Promise.all([
createWorker(),
createWorker(),
]);

results.forEach(result => {
expect(result.success).toBe(true);
expect(result.error).toBeUndefined();
});
});

test('makes query', async () => {
const testModel: ModelDefinition = {
version: '2.0',
Expand Down