|
42 | 42 | import com.google.cloud.bigquery.TableDefinition; |
43 | 43 | import com.google.cloud.bigquery.TableId; |
44 | 44 | import com.google.cloud.bigquery.exception.BigQueryJdbcException; |
| 45 | +import com.google.common.annotations.VisibleForTesting; |
45 | 46 | import io.opentelemetry.api.trace.Span; |
46 | 47 | import io.opentelemetry.api.trace.SpanContext; |
47 | 48 | import io.opentelemetry.api.trace.StatusCode; |
@@ -1731,32 +1732,57 @@ private ResultSet getTablesImpl( |
1731 | 1732 | "getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s", |
1732 | 1733 | effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types)); |
1733 | 1734 |
|
| 1735 | + final Schema resultSchema = defineGetTablesSchema(); |
| 1736 | + final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
| 1737 | + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
| 1738 | + |
| 1739 | + Thread fetcherThread = |
| 1740 | + runGetTablesTaskAsync( |
| 1741 | + effectiveCatalog, effectiveSchemaPattern, tableNamePattern, types, resultSchema, queue); |
| 1742 | + |
| 1743 | + BigQueryJsonResultSet resultSet = |
| 1744 | + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
| 1745 | + |
| 1746 | + LOG.info("Started background thread for getTables"); |
| 1747 | + return resultSet; |
| 1748 | + } |
| 1749 | + |
| 1750 | + @VisibleForTesting |
| 1751 | + Thread runGetTablesTaskAsync( |
| 1752 | + String catalog, |
| 1753 | + String schemaPattern, |
| 1754 | + String tableNamePattern, |
| 1755 | + String[] types, |
| 1756 | + Schema resultSchema, |
| 1757 | + BlockingQueue<BigQueryFieldValueListWrapper> queue) |
| 1758 | + throws SQLException { |
| 1759 | + |
| 1760 | + Tuple<String, String> effectiveIdentifiers = |
| 1761 | + determineEffectiveCatalogAndSchema(catalog, schemaPattern); |
| 1762 | + String effectiveCatalog = effectiveIdentifiers.x(); |
| 1763 | + String effectiveSchemaPattern = effectiveIdentifiers.y(); |
| 1764 | + |
1734 | 1765 | final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); |
1735 | 1766 | final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); |
1736 | 1767 | final Set<String> requestedTypes = |
1737 | 1768 | (types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types)); |
1738 | 1769 |
|
1739 | | - final Schema resultSchema = defineGetTablesSchema(); |
1740 | 1770 | final FieldList resultSchemaFields = resultSchema.getFields(); |
1741 | | - |
1742 | | - final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
1743 | | - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
1744 | 1771 | final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>()); |
1745 | 1772 | final String catalogParam = effectiveCatalog; |
1746 | 1773 | final String schemaParam = effectiveSchemaPattern; |
1747 | | - |
1748 | | - Tracer tracer = this.connection.getTracer(); |
1749 | 1774 | SpanContext parentSpanContext = Span.current().getSpanContext(); |
1750 | 1775 | Runnable tableFetcher = |
1751 | 1776 | () -> { |
1752 | 1777 | Span backgroundSpan = |
1753 | | - tracer |
| 1778 | + this.connection |
| 1779 | + .getTracer() |
1754 | 1780 | .spanBuilder("BigQueryDatabaseMetaData.getTables.background") |
1755 | 1781 | .setNoParent() |
1756 | 1782 | .addLink(parentSpanContext) |
1757 | 1783 | .startSpan(); |
1758 | 1784 |
|
1759 | | - try (Scope backgroundScope = backgroundSpan.makeCurrent()) { |
| 1785 | + try (Scope scope = backgroundSpan.makeCurrent()) { |
1760 | 1786 | ExecutorService apiExecutor = null; |
1761 | 1787 | ExecutorService tableProcessorExecutor = null; |
1762 | 1788 | final FieldList localResultSchemaFields = resultSchemaFields; |
@@ -1898,12 +1924,8 @@ private ResultSet getTablesImpl( |
1898 | 1924 |
|
1899 | 1925 | Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher); |
1900 | 1926 | Thread fetcherThread = new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog); |
1901 | | - BigQueryJsonResultSet resultSet = |
1902 | | - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
1903 | | - |
1904 | 1927 | fetcherThread.start(); |
1905 | | - LOG.info("Started background thread for getTables"); |
1906 | | - return resultSet; |
| 1928 | + return fetcherThread; |
1907 | 1929 | } |
1908 | 1930 |
|
1909 | 1931 | Schema defineGetTablesSchema() { |
@@ -2127,24 +2149,51 @@ private ResultSet getColumnsImpl( |
2127 | 2149 | + " columnNamePattern: %s", |
2128 | 2150 | effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern); |
2129 | 2151 |
|
| 2152 | + final Schema resultSchema = defineGetColumnsSchema(); |
| 2153 | + final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
| 2154 | + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
| 2155 | + |
| 2156 | + Thread fetcherThread = |
| 2157 | + runGetColumnsTaskAsync( |
| 2158 | + catalog, schemaPattern, tableNamePattern, columnNamePattern, resultSchema, queue); |
| 2159 | + |
| 2160 | + BigQueryJsonResultSet resultSet = |
| 2161 | + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
| 2162 | + |
| 2163 | + LOG.info("Started background thread for getColumns"); |
| 2164 | + return resultSet; |
| 2165 | + } |
| 2166 | + |
| 2167 | + @VisibleForTesting |
| 2168 | + Thread runGetColumnsTaskAsync( |
| 2169 | + String catalog, |
| 2170 | + String schemaPattern, |
| 2171 | + String tableNamePattern, |
| 2172 | + String columnNamePattern, |
| 2173 | + Schema resultSchema, |
| 2174 | + BlockingQueue<BigQueryFieldValueListWrapper> queue) |
| 2175 | + throws SQLException { |
| 2176 | + |
| 2177 | + Tuple<String, String> effectiveIdentifiers = |
| 2178 | + determineEffectiveCatalogAndSchema(catalog, schemaPattern); |
| 2179 | + String effectiveCatalog = effectiveIdentifiers.x(); |
| 2180 | + String effectiveSchemaPattern = effectiveIdentifiers.y(); |
| 2181 | + |
2130 | 2182 | Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); |
2131 | 2183 | Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); |
2132 | 2184 | Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern); |
2133 | 2185 |
|
2134 | | - final Schema resultSchema = defineGetColumnsSchema(); |
2135 | 2186 | final FieldList resultSchemaFields = resultSchema.getFields(); |
2136 | | - final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
2137 | | - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
2138 | 2187 | final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>()); |
2139 | 2188 | final String catalogParam = effectiveCatalog; |
2140 | 2189 | final String schemaParam = effectiveSchemaPattern; |
2141 | 2190 |
|
2142 | | - Tracer tracer = this.connection.getTracer(); |
2143 | 2191 | SpanContext parentSpanContext = Span.current().getSpanContext(); |
2144 | 2192 | Runnable columnFetcher = |
2145 | 2193 | () -> { |
2146 | 2194 | Span backgroundSpan = |
2147 | | - tracer |
| 2195 | + this.connection |
| 2196 | + .getTracer() |
2148 | 2197 | .spanBuilder("BigQueryDatabaseMetaData.getColumns.background") |
2149 | 2198 | .setNoParent() |
2150 | 2199 | .addLink(parentSpanContext) |
@@ -2252,12 +2301,8 @@ private ResultSet getColumnsImpl( |
2252 | 2301 | Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher); |
2253 | 2302 | Thread fetcherThread = |
2254 | 2303 | new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog); |
2255 | | - BigQueryJsonResultSet resultSet = |
2256 | | - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
2257 | | - |
2258 | 2304 | fetcherThread.start(); |
2259 | | - LOG.info("Started background thread for getColumns"); |
2260 | | - return resultSet; |
| 2305 | + return fetcherThread; |
2261 | 2306 | } |
2262 | 2307 |
|
2263 | 2308 | private void processTableColumns( |
@@ -2324,7 +2369,7 @@ private void processTableColumns( |
2324 | 2369 | } |
2325 | 2370 | } |
2326 | 2371 |
|
2327 | | - private Schema defineGetColumnsSchema() { |
| 2372 | + Schema defineGetColumnsSchema() { |
2328 | 2373 | List<Field> fields = new ArrayList<>(24); |
2329 | 2374 | fields.add( |
2330 | 2375 | Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) |
@@ -3690,27 +3735,44 @@ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQ |
3690 | 3735 |
|
3691 | 3736 | LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern); |
3692 | 3737 |
|
3693 | | - final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); |
3694 | 3738 | final Schema resultSchema = defineGetSchemasSchema(); |
3695 | | - final FieldList resultSchemaFields = resultSchema.getFields(); |
3696 | | - |
3697 | 3739 | final BlockingQueue<BigQueryFieldValueListWrapper> queue = |
3698 | 3740 | new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
| 3741 | + |
| 3742 | + Thread fetcherThread = runGetSchemasTaskAsync(catalog, schemaPattern, resultSchema, queue); |
| 3743 | + |
| 3744 | + BigQueryJsonResultSet resultSet = |
| 3745 | + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
| 3746 | + |
| 3747 | + LOG.info("Started background thread for getSchemas"); |
| 3748 | + return resultSet; |
| 3749 | + } |
| 3750 | + |
| 3751 | + @VisibleForTesting |
| 3752 | + Thread runGetSchemasTaskAsync( |
| 3753 | + String catalog, |
| 3754 | + String schemaPattern, |
| 3755 | + Schema resultSchema, |
| 3756 | + BlockingQueue<BigQueryFieldValueListWrapper> queue) |
| 3757 | + throws SQLException { |
| 3758 | + |
| 3759 | + final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); |
| 3760 | + final FieldList resultSchemaFields = resultSchema.getFields(); |
3699 | 3761 | final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>()); |
3700 | 3762 | final String catalogParam = catalog; |
3701 | 3763 |
|
3702 | | - Tracer tracer = this.connection.getTracer(); |
3703 | 3764 | SpanContext parentSpanContext = Span.current().getSpanContext(); |
3704 | 3765 | Runnable schemaFetcher = |
3705 | 3766 | () -> { |
3706 | 3767 | Span backgroundSpan = |
3707 | | - tracer |
| 3768 | + this.connection |
| 3769 | + .getTracer() |
3708 | 3770 | .spanBuilder("BigQueryDatabaseMetaData.getSchemas.background") |
3709 | 3771 | .setNoParent() |
3710 | 3772 | .addLink(parentSpanContext) |
3711 | 3773 | .startSpan(); |
3712 | 3774 |
|
3713 | | - try (Scope backgroundScope = backgroundSpan.makeCurrent()) { |
| 3775 | + try (Scope scope = backgroundSpan.makeCurrent()) { |
3714 | 3776 | final FieldList localResultSchemaFields = resultSchemaFields; |
3715 | 3777 | List<String> projectsToScanList = new ArrayList<>(); |
3716 | 3778 |
|
@@ -3791,12 +3853,8 @@ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQ |
3791 | 3853 |
|
3792 | 3854 | Runnable wrappedFetcher = Context.current().wrap(schemaFetcher); |
3793 | 3855 | Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog); |
3794 | | - BigQueryJsonResultSet resultSet = |
3795 | | - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); |
3796 | | - |
3797 | 3856 | fetcherThread.start(); |
3798 | | - LOG.info("Started background thread for getSchemas"); |
3799 | | - return resultSet; |
| 3857 | + return fetcherThread; |
3800 | 3858 | } |
3801 | 3859 |
|
3802 | 3860 | Schema defineGetSchemasSchema() { |
|
0 commit comments