Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
- Added a new datatype `DayTimeIntervalType` that allows users to create intervals for datetime operations.
- Added support for `FileOperation.list` to list files in a stage with metadata.
- Added support for `FileOperation.remove` to remove files in a stage.
- Added an option to specify `copy_grants` for the following `DataFrame` APIs:
- `create_or_replace_view`
- `create_or_replace_temp_view`
- `create_or_replace_dynamic_table`
- Added a new function `snowflake.snowpark.functions.vectorized` that allows users to mark a function as vectorized UDF.
- Added support for parameter `use_vectorized_scanner` in function `Session.write_pandas()`.
- Added support for the following scalar functions in `functions.py`:
Expand Down
14 changes: 8 additions & 6 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1338,12 +1338,13 @@ def do_resolve_with_resolved_children(
)

return self.plan_builder.create_or_replace_view(
logical_plan.name,
resolved_children[logical_plan.child],
is_temp,
logical_plan.comment,
logical_plan.replace,
logical_plan,
name=logical_plan.name,
child=resolved_children[logical_plan.child],
is_temp=is_temp,
comment=logical_plan.comment,
replace=logical_plan.replace,
copy_grants=logical_plan.copy_grants,
source_plan=logical_plan,
)

if isinstance(logical_plan, CreateDynamicTableCommand):
Expand All @@ -1365,6 +1366,7 @@ def do_resolve_with_resolved_children(
child=resolved_children[logical_plan.child],
source_plan=logical_plan,
iceberg_config=logical_plan.iceberg_config,
copy_grants=logical_plan.copy_grants,
)

if isinstance(logical_plan, ReadFileNode):
Expand Down
11 changes: 9 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,12 @@ def order_expression(name: str, direction: str, null_ordering: str) -> str:


def create_or_replace_view_statement(
name: str, child: str, is_temp: bool, comment: Optional[str], replace: bool
name: str,
child: str,
is_temp: bool,
comment: Optional[str],
replace: bool,
copy_grants: bool,
) -> str:
comment_sql = get_comment_sql(comment)
return (
Expand All @@ -1386,6 +1391,7 @@ def create_or_replace_view_statement(
+ VIEW
+ name
+ comment_sql
+ (COPY_GRANTS if copy_grants else EMPTY_STRING)
+ AS
+ project_statement([], child)
)
Expand All @@ -1406,6 +1412,7 @@ def create_or_replace_dynamic_table_statement(
max_data_extension_time: Optional[int],
child: str,
iceberg_config: Optional[dict] = None,
copy_grants: bool = False,
) -> str:
cluster_by_sql = (
f"{CLUSTER_BY}{LEFT_PARENTHESIS}{COMMA.join(clustering_keys)}{RIGHT_PARENTHESIS}"
Expand Down Expand Up @@ -1436,7 +1443,7 @@ def create_or_replace_dynamic_table_statement(
f"{IF + NOT + EXISTS if if_not_exists else EMPTY_STRING}{name}{LAG}{EQUALS}"
f"{convert_value_to_sql_option(lag)}{WAREHOUSE}{EQUALS}{warehouse}"
f"{refresh_and_initialize_options}{cluster_by_sql}{data_retention_options}{iceberg_options}"
f"{comment_sql}{AS}{project_statement([], child)}"
f"{comment_sql}{COPY_GRANTS if copy_grants else EMPTY_STRING}{AS}{project_statement([], child)}"
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1548,6 +1548,7 @@ def create_or_replace_view(
is_temp: bool,
comment: Optional[str],
replace: bool,
copy_grants: bool,
source_plan: Optional[LogicalPlan],
) -> SnowflakePlan:
if len(child.queries) != 1:
Expand All @@ -1574,7 +1575,7 @@ def create_or_replace_view(

return self.build(
lambda x: create_or_replace_view_statement(
name, x, is_temp, comment, replace
name, x, is_temp, comment, replace, copy_grants
),
child,
source_plan,
Expand Down Expand Up @@ -1666,6 +1667,7 @@ def create_or_replace_dynamic_table(
child: SnowflakePlan,
source_plan: Optional[LogicalPlan],
iceberg_config: Optional[dict] = None,
copy_grants: bool = False,
) -> SnowflakePlan:

child = self.find_and_update_table_function_plan(child)
Expand Down Expand Up @@ -1705,6 +1707,7 @@ def create_or_replace_dynamic_table(
max_data_extension_time=max_data_extension_time,
child=x,
iceberg_config=iceberg_config,
copy_grants=copy_grants,
),
child,
source_plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,15 @@ def __init__(
view_type: ViewType,
comment: Optional[str],
replace: bool,
copy_grants: bool,
child: LogicalPlan,
) -> None:
super().__init__(child)
self.name = name
self.view_type = view_type
self.comment = comment
self.replace = replace
self.copy_grants = copy_grants


class CreateDynamicTableCommand(UnaryNode):
Expand All @@ -347,6 +349,7 @@ def __init__(
max_data_extension_time: Optional[int],
child: LogicalPlan,
iceberg_config: Optional[dict] = None,
copy_grants: bool = False,
) -> None:
super().__init__(child)
self.name = name
Expand All @@ -361,3 +364,4 @@ def __init__(
self.data_retention_time = data_retention_time
self.max_data_extension_time = max_data_extension_time
self.iceberg_config = iceberg_config
self.copy_grants = copy_grants
40 changes: 21 additions & 19 deletions src/snowflake/snowpark/_internal/proto/ast.proto
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ message DataframeCollect {
repeated Tuple_String_String statement_params = 7;
}

// dataframe-io.ir:163
// dataframe-io.ir:165
message DataframeCopyIntoTable {
repeated Tuple_String_Expr copy_options = 1;
Expr df = 2;
Expand All @@ -859,32 +859,34 @@ message DataframeCount {
repeated Tuple_String_String statement_params = 4;
}

// dataframe-io.ir:147
// dataframe-io.ir:148
message DataframeCreateOrReplaceDynamicTable {
repeated Expr clustering_keys = 1;
google.protobuf.StringValue comment = 2;
google.protobuf.Int64Value data_retention_time = 3;
Expr df = 4;
google.protobuf.StringValue initialize = 5;
bool is_transient = 6;
string lag = 7;
google.protobuf.Int64Value max_data_extension_time = 8;
SaveMode mode = 9;
NameRef name = 10;
google.protobuf.StringValue refresh_mode = 11;
SrcPosition src = 12;
repeated Tuple_String_String statement_params = 13;
string warehouse = 14;
bool copy_grants = 3;
google.protobuf.Int64Value data_retention_time = 4;
Expr df = 5;
google.protobuf.StringValue initialize = 6;
bool is_transient = 7;
string lag = 8;
google.protobuf.Int64Value max_data_extension_time = 9;
SaveMode mode = 10;
NameRef name = 11;
google.protobuf.StringValue refresh_mode = 12;
SrcPosition src = 13;
repeated Tuple_String_String statement_params = 14;
string warehouse = 15;
}

// dataframe-io.ir:139
message DataframeCreateOrReplaceView {
google.protobuf.StringValue comment = 1;
Expr df = 2;
bool is_temp = 3;
NameRef name = 4;
SrcPosition src = 5;
repeated Tuple_String_String statement_params = 6;
bool copy_grants = 2;
Expr df = 3;
bool is_temp = 4;
NameRef name = 5;
SrcPosition src = 6;
repeated Tuple_String_String statement_params = 7;
}

// dataframe.ir:185
Expand Down
29 changes: 24 additions & 5 deletions src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5246,6 +5246,7 @@ def create_or_replace_view(
*,
comment: Optional[str] = None,
statement_params: Optional[Dict[str, str]] = None,
copy_grants: bool = False,
_emit_ast: bool = True,
) -> List[Row]:
"""Creates a view that captures the computation expressed by this DataFrame.
Expand All @@ -5261,6 +5262,8 @@ def create_or_replace_view(
that specifies the database name, schema name, and view name.
comment: Adds a comment for the created view. See
`COMMENT <https://docs.snowflake.com/en/sql-reference/sql/comment>`_.
copy_grants: A boolean value that specifies whether to retain the access permissions from the original view
when a new view is created. Defaults to False.
statement_params: Dictionary of statement level parameters to be set while executing this action.
"""

Expand All @@ -5272,6 +5275,7 @@ def create_or_replace_view(
stmt = self._session._ast_batch.bind()
expr = with_src_position(stmt.expr.dataframe_create_or_replace_view, stmt)
expr.is_temp = False
expr.copy_grants = copy_grants
self._set_ast_ref(expr.df)
build_view_name(expr.name, name)
if comment is not None:
Expand All @@ -5282,6 +5286,7 @@ def create_or_replace_view(
formatted_name,
PersistedView(),
comment=comment,
copy_grants=copy_grants,
_statement_params=create_or_update_statement_params_with_query_tag(
statement_params or self._statement_params,
self._session.query_tag,
Expand Down Expand Up @@ -5311,6 +5316,7 @@ def create_or_replace_dynamic_table(
max_data_extension_time: Optional[int] = None,
statement_params: Optional[Dict[str, str]] = None,
iceberg_config: Optional[dict] = None,
copy_grants: bool = False,
_emit_ast: bool = True,
) -> List[Row]:
"""Creates a dynamic table that captures the computation expressed by this DataFrame.
Expand Down Expand Up @@ -5354,6 +5360,8 @@ def create_or_replace_dynamic_table(
- base_location: the base directory that snowflake can write iceberg metadata and files to.
- catalog_sync: optionally sets the catalog integration configured for Polaris Catalog.
- storage_serialization_policy: specifies the storage serialization policy for the table.
copy_grants: A boolean value that specifies whether to retain the access permissions from the original view
when a new view is created. Defaults to False.


Note:
Expand Down Expand Up @@ -5407,6 +5415,7 @@ def create_or_replace_dynamic_table(

if statement_params is not None:
build_expr_from_dict_str_str(expr.statement_params, statement_params)
expr.copy_grants = copy_grants
# TODO: Support create_or_replace_dynamic_table in MockServerConnection.
from snowflake.snowpark.mock._connection import MockServerConnection

Expand Down Expand Up @@ -5443,6 +5452,7 @@ def create_or_replace_dynamic_table(
),
),
iceberg_config=iceberg_config,
copy_grants=copy_grants,
)

@df_collect_api_telemetry
Expand All @@ -5453,6 +5463,7 @@ def create_or_replace_temp_view(
*,
comment: Optional[str] = None,
statement_params: Optional[Dict[str, str]] = None,
copy_grants: bool = False,
_emit_ast: bool = True,
) -> List[Row]:
"""Creates or replace a temporary view that returns the same results as this DataFrame.
Expand All @@ -5472,6 +5483,8 @@ def create_or_replace_temp_view(
that specifies the database name, schema name, and view name.
comment: Adds a comment for the created view. See
`COMMENT <https://docs.snowflake.com/en/sql-reference/sql/comment>`_.
copy_grants: A boolean value that specifies whether to retain the access permissions from the original view
when a new view is created. Defaults to False.
statement_params: Dictionary of statement level parameters to be set while executing this action.
"""

Expand All @@ -5489,11 +5502,13 @@ def create_or_replace_temp_view(
expr.comment.value = comment
if statement_params is not None:
build_expr_from_dict_str_str(expr.statement_params, statement_params)
expr.copy_grants = copy_grants

return self._do_create_or_replace_view(
formatted_name,
LocalTempView(),
comment=comment,
copy_grants=copy_grants,
_statement_params=create_or_update_statement_params_with_query_tag(
statement_params or self._statement_params,
self._session.query_tag,
Expand Down Expand Up @@ -5573,16 +5588,18 @@ def _do_create_or_replace_view(
view_type: ViewType,
comment: Optional[str],
replace: bool = True,
copy_grants: bool = False,
_ast_stmt: Optional[proto.Bind] = None,
**kwargs,
):
validate_object_name(view_name)
cmd = CreateViewCommand(
view_name,
view_type,
comment,
replace,
self._plan,
name=view_name,
view_type=view_type,
comment=comment,
replace=replace,
copy_grants=copy_grants,
child=self._plan,
)

return self._session._conn.execute(
Expand All @@ -5603,6 +5620,7 @@ def _do_create_or_replace_dynamic_table(
data_retention_time: Optional[int] = None,
max_data_extension_time: Optional[int] = None,
iceberg_config: Optional[dict] = None,
copy_grants: bool = False,
**kwargs,
):
validate_object_name(name)
Expand Down Expand Up @@ -5630,6 +5648,7 @@ def _do_create_or_replace_dynamic_table(
max_data_extension_time=max_data_extension_time,
child=self._plan,
iceberg_config=iceberg_config,
copy_grants=copy_grants,
)

return self._session._conn.execute(
Expand Down
Loading