Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions .opencode/skills/data-parity/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ WHERE table_schema = 'mydb' AND table_name = 'orders'
ORDER BY ordinal_position
```

```sql
-- SQL Server / Fabric
SELECT c.name AS column_name, tp.name AS data_type, c.is_nullable,
dc.definition AS column_default
FROM sys.columns c
INNER JOIN sys.types tp ON c.user_type_id = tp.user_type_id
INNER JOIN sys.objects o ON c.object_id = o.object_id
INNER JOIN sys.schemas s ON o.schema_id = s.schema_id
LEFT JOIN sys.default_constraints dc ON c.default_object_id = dc.object_id
WHERE s.name = 'dbo' AND o.name = 'orders'
ORDER BY c.column_id
```

```sql
-- ClickHouse
DESCRIBE TABLE source_db.events
Expand Down Expand Up @@ -409,3 +422,56 @@ Even when tables match perfectly, state what was checked:

**Silently excluding auto-timestamp columns without asking the user**
→ Always present detected auto-timestamp columns (Step 4) and get explicit confirmation. In migration scenarios, `created_at` should be *identical* — excluding it silently hides real bugs.

---

## SQL Server and Microsoft Fabric

### Minimum Version Requirements

| Component | Minimum Version | Why |
|---|---|---|
| **SQL Server** | 2022 (16.x) | `DATETRUNC()` used for date partitioning; `LEAST()`/`GREATEST()` used by Rust engine |
| **Azure SQL Database** | Any current version | Always has `DATETRUNC()` and `LEAST()` |
| **Microsoft Fabric** | Any current version | T-SQL surface includes all required functions |
| **mssql** (npm) | 12.0.0 | `ConnectionPool` isolation for concurrent connections, tedious 19 |
| **@azure/identity** (npm) | 4.0.0 | Required only for Azure AD authentication; tedious imports it internally |

> **Note:** Date partitioning (`partition_column` + `partition_granularity`) uses `DATETRUNC()` which is **not available on SQL Server 2019 or earlier**. Basic diff operations (joindiff, hashdiff, profile) work on older versions. If you need partitioned diffs on SQL Server < 2022, use numeric or categorical partitioning instead.

### Supported Configurations

| Warehouse Type | Authentication | Notes |
|---|---|---|
| `sqlserver` / `mssql` | User/password or Azure AD | On-prem or Azure SQL. SQL Server 2022+ required for date partitioning. |
| `fabric` | Azure AD only | Microsoft Fabric SQL endpoint. Always uses TLS encryption. |

### Connecting to Microsoft Fabric

Fabric uses the same TDS protocol as SQL Server — no separate driver needed. Configuration:

```
type: "fabric"
host: "<workspace-id>-<item-id>.datawarehouse.fabric.microsoft.com"
database: "<warehouse-name>"
authentication: "azure-active-directory-default" # recommended
```

Auth shorthands (mapped to full tedious type names):
- `CLI` or `default` → `azure-active-directory-default`
- `password` → `azure-active-directory-password`
- `service-principal` → `azure-active-directory-service-principal-secret`
- `msi` or `managed-identity` → `azure-active-directory-msi-vm`

Full Azure AD authentication types:
- `azure-active-directory-default` — auto-discovers credentials via `DefaultAzureCredential` (recommended; works with `az login`)
- `azure-active-directory-password` — username/password with `azure_client_id` and `azure_tenant_id`
- `azure-active-directory-access-token` — pre-obtained token (does **not** auto-refresh)
- `azure-active-directory-service-principal-secret` — service principal with `azure_client_id`, `azure_client_secret`, `azure_tenant_id`
- `azure-active-directory-msi-vm` / `azure-active-directory-msi-app-service` — managed identity

### Algorithm Behavior

- **Same-warehouse** MSSQL or Fabric → `joindiff` (single FULL OUTER JOIN, most efficient)
- **Cross-warehouse** MSSQL/Fabric ↔ other database → `hashdiff` (automatic when using `auto`)
- The Rust engine maps `sqlserver`/`mssql` to `tsql` dialect and `fabric` to `fabric` dialect — both generate valid T-SQL syntax with bracket quoting (`[schema].[table]`).
79 changes: 70 additions & 9 deletions bun.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion packages/drivers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
"@google-cloud/bigquery": "^8.0.0",
"@databricks/sql": "^1.0.0",
"mysql2": "^3.0.0",
"mssql": "^11.0.0",
"mssql": "^12.0.0",
"oracledb": "^6.0.0",
"duckdb": "^1.0.0",
"mongodb": "^6.0.0",
"@clickhouse/client": "^1.0.0"
},
"peerDependencies": {
"@azure/identity": ">=4.0.0"
},
"peerDependenciesMeta": {
"@azure/identity": {
"optional": true
}
}
}
6 changes: 6 additions & 0 deletions packages/drivers/src/normalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const SQLSERVER_ALIASES: AliasMap = {
...COMMON_ALIASES,
host: ["server", "serverName", "server_name"],
trust_server_certificate: ["trustServerCertificate"],
authentication: ["authenticationType", "auth_type", "authentication_type"],
azure_tenant_id: ["tenantId", "tenant_id", "azureTenantId"],
azure_client_id: ["clientId", "client_id", "azureClientId"],
azure_client_secret: ["clientSecret", "client_secret", "azureClientSecret"],
access_token: ["token", "accessToken"],
}

const ORACLE_ALIASES: AliasMap = {
Expand Down Expand Up @@ -104,6 +109,7 @@ const DRIVER_ALIASES: Record<string, AliasMap> = {
mariadb: MYSQL_ALIASES,
sqlserver: SQLSERVER_ALIASES,
mssql: SQLSERVER_ALIASES,
fabric: SQLSERVER_ALIASES,
oracle: ORACLE_ALIASES,
mongodb: MONGODB_ALIASES,
mongo: MONGODB_ALIASES,
Expand Down
176 changes: 159 additions & 17 deletions packages/drivers/src/sqlserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, Sche

export async function connect(config: ConnectionConfig): Promise<Connector> {
let mssql: any
let MssqlConnectionPool: any
try {
// @ts-expect-error — mssql has no type declarations; installed as optional peerDependency
mssql = await import("mssql")
mssql = mssql.default || mssql
const mod = await import("mssql")
mssql = mod.default || mod
// ConnectionPool is a named export, not on .default
MssqlConnectionPool = mod.ConnectionPool ?? mssql.ConnectionPool
} catch {
throw new Error(
"SQL Server driver not installed. Run: npm install mssql",
Expand All @@ -24,8 +27,6 @@ export async function connect(config: ConnectionConfig): Promise<Connector> {
server: config.host ?? "127.0.0.1",
port: config.port ?? 1433,
database: config.database,
user: config.user,
password: config.password,
options: {
encrypt: config.encrypt ?? false,
trustServerCertificate: config.trust_server_certificate ?? true,
Expand All @@ -39,7 +40,131 @@ export async function connect(config: ConnectionConfig): Promise<Connector> {
},
}

pool = await mssql.connect(mssqlConfig)
// Normalize shorthand auth values to tedious-compatible types
const AUTH_SHORTHANDS: Record<string, string> = {
cli: "azure-active-directory-default",
default: "azure-active-directory-default",
password: "azure-active-directory-password",
"service-principal": "azure-active-directory-service-principal-secret",
serviceprincipal: "azure-active-directory-service-principal-secret",
"managed-identity": "azure-active-directory-msi-vm",
msi: "azure-active-directory-msi-vm",
}
const rawAuth = config.authentication as string | undefined
const authType = rawAuth ? (AUTH_SHORTHANDS[rawAuth.toLowerCase()] ?? rawAuth) : undefined

if (authType?.startsWith("azure-active-directory")) {
;(mssqlConfig.options as any).encrypt = true

// Resolve a raw Azure AD access token.
// Used by both `azure-active-directory-default` and by
// `azure-active-directory-access-token` when no token was provided.
//
// We acquire the token ourselves rather than letting tedious do it because:
// 1. Bun can resolve @azure/identity to the browser bundle (inside
// tedious or even our own import), where DefaultAzureCredential
// is a non-functional stub that throws.
// 2. Passing a credential object via type:"token-credential" hits a
// CJS/ESM isTokenCredential boundary mismatch in Bun.
//
// Strategy: try @azure/identity first (works when module resolution
// is correct), fall back to shelling out to `az account get-access-token`
// (works everywhere Azure CLI is installed).
const acquireAzureToken = async (): Promise<string> => {
let token: string | undefined

try {
const azureIdentity = await import("@azure/identity")
const credential = new azureIdentity.DefaultAzureCredential(
config.azure_client_id
? { managedIdentityClientId: config.azure_client_id as string }
: undefined,
)
const tokenResponse = await credential.getToken("https://database.windows.net/.default")
token = tokenResponse?.token
} catch {
// @azure/identity unavailable or browser bundle — fall through
}

if (!token) {
try {
const { execSync } = await import("node:child_process")
const out = execSync(
"az account get-access-token --resource https://database.windows.net/ --query accessToken -o tsv",
{ encoding: "utf-8", timeout: 15000, stdio: ["pipe", "pipe", "pipe"] },
).trim()
if (out) token = out
} catch {
// az CLI not installed or not logged in
}
}

if (!token) {
throw new Error(
"Azure AD token acquisition failed. Either install @azure/identity (npm install @azure/identity) " +
"or log in with Azure CLI (az login).",
)
}
return token
}

if (authType === "azure-active-directory-default") {
mssqlConfig.authentication = {
type: "azure-active-directory-access-token",
options: { token: await acquireAzureToken() },
}
} else if (authType === "azure-active-directory-password") {
mssqlConfig.authentication = {
type: "azure-active-directory-password",
options: {
userName: config.user,
password: config.password,
clientId: config.azure_client_id,
tenantId: config.azure_tenant_id,
},
}
} else if (authType === "azure-active-directory-access-token") {
// If the caller supplied a token, use it; otherwise acquire one
// automatically (DefaultAzureCredential → az CLI).
const suppliedToken = (config.token ?? config.access_token) as string | undefined
mssqlConfig.authentication = {
type: "azure-active-directory-access-token",
options: { token: suppliedToken ?? (await acquireAzureToken()) },
}
} else if (
authType === "azure-active-directory-msi-vm" ||
authType === "azure-active-directory-msi-app-service"
) {
mssqlConfig.authentication = {
type: authType,
options: {
...(config.azure_client_id ? { clientId: config.azure_client_id } : {}),
},
}
} else if (authType === "azure-active-directory-service-principal-secret") {
mssqlConfig.authentication = {
type: "azure-active-directory-service-principal-secret",
options: {
clientId: config.azure_client_id,
clientSecret: config.azure_client_secret,
tenantId: config.azure_tenant_id,
},
}
}
} else {
// Standard SQL Server user/password
mssqlConfig.user = config.user
mssqlConfig.password = config.password
}

// Use an explicit ConnectionPool (not the global mssql.connect()) so
// multiple simultaneous connections to different servers are isolated.
if (MssqlConnectionPool) {
pool = new MssqlConnectionPool(mssqlConfig)
await pool.connect()
} else {
pool = await mssql.connect(mssqlConfig)
}
},

async execute(sql: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise<ConnectorResult> {
Expand All @@ -62,22 +187,39 @@ export async function connect(config: ConnectionConfig): Promise<Connector> {
}

const result = await pool.request().query(query)
const rows = result.recordset ?? []
const recordset = result.recordset ?? []
const truncated = effectiveLimit > 0 && recordset.length > effectiveLimit
const limitedRecordset = truncated ? recordset.slice(0, effectiveLimit) : recordset

// mssql merges unnamed columns (e.g. SELECT COUNT(*), SUM(...)) into a
// single array under the empty-string key: row[""] = [val1, val2, ...].
// Flatten only the empty-string key to restore positional column values;
// legitimate array values from other keys are preserved as-is.
const flattenRow = (row: any): any[] => {
const vals: any[] = []
for (const [k, v] of Object.entries(row)) {
if (k === "" && Array.isArray(v)) vals.push(...v)
else vals.push(v)
}
return vals
}

const rows = limitedRecordset.map(flattenRow)
const sampleFlat = rows.length > 0 ? rows[0] : []
const namedKeys = recordset.length > 0 ? Object.keys(recordset[0]) : []
const columns =
rows.length > 0
? Object.keys(rows[0]).filter((k) => !k.startsWith("_"))
: (result.recordset?.columns
? Object.keys(result.recordset.columns)
: [])
const truncated = effectiveLimit > 0 && rows.length > effectiveLimit
const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows
namedKeys.length === sampleFlat.length
? namedKeys
: sampleFlat.length > 0
? sampleFlat.map((_: any, i: number) => `col_${i}`)
: (result.recordset?.columns
? Object.keys(result.recordset.columns)
: [])

return {
columns,
rows: limitedRows.map((row: any) =>
columns.map((col) => row[col]),
),
row_count: limitedRows.length,
rows,
row_count: rows.length,
truncated,
}
},
Expand Down
Loading
Loading