Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions backend/dataall/modules/worksheets/api/resolvers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataall.base.db import exceptions
from dataall.modules.worksheets.api.enums import WorksheetRole
from dataall.modules.worksheets.services.worksheet_enums import WorksheetRole, WorksheetResultsFormat
from dataall.modules.worksheets.db.worksheet_models import Worksheet
from dataall.modules.worksheets.db.worksheet_repositories import WorksheetRepository
from dataall.modules.worksheets.services.worksheet_service import WorksheetService
Expand Down Expand Up @@ -73,7 +73,18 @@ def delete_worksheet(context, source, worksheetUri: str = None):


def create_athena_query_result_download_url(context: Context, source, input: dict = None):
WorksheetQueryResultService.validate_input(input)

if not input:
# raise exceptions.InvalidInput('data', input, 'input is required')
Comment thread
dlpzx marked this conversation as resolved.
Outdated
raise exceptions.RequiredParameter('data')
if not input.get('athenaQueryId'):
raise exceptions.RequiredParameter('athenaQueryId')
if not input.get('fileFormat'):
raise exceptions.RequiredParameter('fileFormat')
if not hasattr(WorksheetResultsFormat, input.get('fileFormat').upper()):
raise exceptions.InvalidInput(
'fileFormat', input.get('fileFormat'),
', '.join(result_format.value for result_format in WorksheetResultsFormat))

with context.engine.scoped_session() as session:
return WorksheetQueryResultService.download_sql_query_result(session=session, data=input)
5 changes: 2 additions & 3 deletions backend/dataall/modules/worksheets/api/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,14 @@
name='WorksheetQueryResult',
fields=[
gql.Field(name='worksheetQueryResultUri', type=gql.ID),
gql.Field(name='queryType', type=gql.NonNullableType(gql.String)),
gql.Field(name='sqlBody', type=gql.String),
gql.Field(name='AthenaQueryId', type=gql.NonNullableType(gql.String)),
gql.Field(name='region', type=gql.NonNullableType(gql.String)),
gql.Field(name='AwsAccountId', type=gql.NonNullableType(gql.String)),
gql.Field(name='ElapsedTimeInMs', type=gql.Integer),
gql.Field(name='elapsedTimeInMs', type=gql.Integer),
gql.Field(name='created', type=gql.NonNullableType(gql.String)),
gql.Field(name='downloadLink', type=gql.String),
gql.Field(name='OutputLocation', type=gql.String),
gql.Field(name='outputLocation', type=gql.String),
gql.Field(name='expiresIn', type=gql.AWSDateTime),
gql.Field(name='fileFormat', type=gql.String),
],
Expand Down
73 changes: 73 additions & 0 deletions backend/dataall/modules/worksheets/aws/s3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import boto3
from botocore.config import Config
Comment thread
dlpzx marked this conversation as resolved.
Outdated

from botocore.exceptions import ClientError
import logging
from dataall.base.db.exceptions import AWSResourceNotFound
from dataall.base.aws.sts import SessionHelper

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from dataall.core.environment.db.environment_models import Environment
try:
from mypy_boto3_s3 import S3Client as S3ClientType
except ImportError:
S3ClientType = None

log = logging.getLogger(__name__)


class S3Client:

Comment thread
dlpzx marked this conversation as resolved.
Outdated
def __init__(self, env: 'Environment'):
self._client = SessionHelper.remote_session(env.AwsAccountId, env.region).client('s3', region_name=env.region)
self._env = env

@property
def client(self) -> 'S3ClientType':
return self._client

Comment thread
dlpzx marked this conversation as resolved.
def get_presigned_url(self, bucket, key, expire_minutes: int = 15):
expire_seconds = expire_minutes * 60
try:
presigned_url = self.client.generate_presigned_url(
'get_object',
Params=dict(
Bucket=bucket,
Key=key,
),
ExpiresIn=expire_seconds,
)
return presigned_url
except ClientError as e:
log.error(f'Failed to get presigned URL due to: {e}')
raise e

def object_exists(self, bucket, key) -> bool:
try:
self.client.head_object(Bucket=bucket, Key=key)
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
log.info(f'Object {key} not found in bucket {bucket}')
return False
log.error(f'Failed to check object existence due to: {e}')
raise AWSResourceNotFound('s3_object_exists', f'Object {key} not found in bucket {bucket}')


def put_object(self, bucket, key, body):
try:
self.client.put_object(Bucket=bucket, Key=key, Body=body)
except ClientError as e:
log.error(f'Failed to put object due to: {e}')
raise e


def get_object(self, bucket, key) -> str:
try:
response = self.client.get_object(Bucket=bucket, Key=key)
return response['Body'].read().decode('utf-8')
except ClientError as e:
log.error(f'Failed to get object due to: {e}')
raise e
5 changes: 3 additions & 2 deletions backend/dataall/modules/worksheets/db/worksheet_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import enum

from future.backports.email.policy import default
from sqlalchemy import Column, DateTime, Integer, Enum, String, BigInteger
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import query_expression
Expand All @@ -27,10 +28,10 @@ class Worksheet(Resource, Base):

class WorksheetQueryResult(Base):
__tablename__ = 'worksheet_query_result'
worksheetQueryResultUri = Column(String, primary_key=True, default=utils.uuid('worksheetQueryResultUri'))
worksheetUri = Column(String, nullable=False)
AthenaQueryId = Column(String, primary_key=True)
AthenaQueryId = Column(String, nullable=False)
status = Column(String, nullable=True)
queryType = Column(Enum(QueryType), nullable=False, default=True)
sqlBody = Column(String, nullable=True)
AwsAccountId = Column(String, nullable=False)
region = Column(String, nullable=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ class WorksheetRole(GraphQLEnumMapper):
Creator = '950'
Admin = '900'
NoPermission = '000'


class WorksheetResultsFormat(GraphQLEnumMapper):
CSV = 'csv'
XLSX = 'xlsx'
Original file line number Diff line number Diff line change
@@ -1,52 +1,37 @@
import csv
import io
import os
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC as DATETIME_UTC
from typing import TYPE_CHECKING

from openpyxl import Workbook

from dataall.base.aws.s3_client import S3_client
from dataall.base.db import exceptions
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.modules.worksheets.aws.s3_client import S3Client
from dataall.modules.worksheets.db.worksheet_models import WorksheetQueryResult
from dataall.modules.worksheets.db.worksheet_repositories import WorksheetRepository
from dataall.modules.worksheets.services.worksheet_enums import WorksheetResultsFormat
from dataall.modules.worksheets.services.worksheet_permissions import RUN_ATHENA_QUERY
from dataall.modules.worksheets.services.worksheet_service import WorksheetService

Comment thread
dlpzx marked this conversation as resolved.
if TYPE_CHECKING:
try:
from sqlalchemy.orm import Session
from mypy_boto3_s3.client import S3Client
from openpyxl.worksheet.worksheet import Worksheet
except ImportError:
print('skipping type checks as stubs are not installed')
S3Client = None
Session = None
Worksheet = None


class WorksheetQueryResultService:
SupportedFormats = {'csv', 'xlsx'}
_DEFAULT_ATHENA_QUERIES_PATH = 'athenaqueries'
_DEFAULT_QUERY_RESULTS_TIMEOUT = os.getenv('QUERY_RESULT_TIMEOUT_MINUTES', 120)

Comment thread
dlpzx marked this conversation as resolved.
@staticmethod
def validate_input(data):
if not data:
raise exceptions.InvalidInput('data', data, 'input is required')
if not data.get('athenaQueryId'):
raise exceptions.RequiredParameter('athenaQueryId')
if not data.get('fileFormat'):
raise exceptions.RequiredParameter('fileFormat')
if data.get('fileFormat', '').lower() not in WorksheetQueryResultService.SupportedFormats:
raise exceptions.InvalidInput(
'fileFormat', data.get('fileFormat'), ', '.join(WorksheetQueryResultService.SupportedFormats)
)

@staticmethod
def get_output_bucket(session: 'Session', environment_uri: str) -> str:
environment = EnvironmentService.get_environment_by_uri(session, environment_uri)
bucket = environment.EnvironmentDefaultBucketName
return bucket

@staticmethod
def create_query_result(
def _create_query_result(
environment_bucket: str, athena_workgroup: str, worksheet_uri: str, region: str, aws_account_id: str, data: dict
) -> WorksheetQueryResult:
sql_query_result = WorksheetQueryResult(
Expand All @@ -55,21 +40,21 @@ def create_query_result(
fileFormat=data.get('fileFormat'),
OutputLocation=f's3://{environment_bucket}/athenaqueries/{athena_workgroup}/',
Comment thread
dlpzx marked this conversation as resolved.
Outdated
region=region,
AwsAccountId=aws_account_id,
queryType='data',
AwsAccountId=aws_account_id
)
return sql_query_result

Comment thread
dlpzx marked this conversation as resolved.
@staticmethod
def get_file_key(
workgroup: str, query_id: str, file_format: str = 'csv', athena_queries_dir: str = 'athenaqueries'
def build_s3_file_path(
workgroup: str, query_id: str, athena_queries_dir: str = None
) -> str:
return f'{athena_queries_dir}/{workgroup}/{query_id}.{file_format}'
athena_queries_dir = athena_queries_dir or WorksheetQueryResultService._DEFAULT_ATHENA_QUERIES_PATH
return f'{athena_queries_dir}/{workgroup}/{query_id}'

@staticmethod
def convert_csv_to_xlsx(csv_data) -> io.BytesIO:
wb = Workbook()
ws = wb.active
ws: 'Worksheet' = wb.active
csv_reader = csv.reader(csv_data.splitlines())
for row in csv_reader:
ws.append(row)
Expand All @@ -80,58 +65,48 @@ def convert_csv_to_xlsx(csv_data) -> io.BytesIO:
return excel_buffer

@staticmethod
def handle_xlsx_format(output_bucket: str, file_key: str) -> bool:
aws_region_name = os.getenv('AWS_REGION_NAME', 'eu-west-1')
file_name, _ = file_key.split('.')
csv_data = S3_client.get_object(region=aws_region_name, bucket=output_bucket, key=f'{file_name}.csv')
excel_buffer = WorksheetQueryResultService.convert_csv_to_xlsx(csv_data)
S3_client.put_object(region=aws_region_name, bucket=output_bucket, key=file_key, body=excel_buffer)
return True

@staticmethod
@ResourcePolicyService.has_resource_permission(RUN_ATHENA_QUERY)
def download_sql_query_result(session: 'Session', data: dict = None):
# # default timeout for the download link is 2 hours(in minutes)
default_timeout = os.getenv('QUERY_RESULT_TIMEOUT_MINUTES', 120)

environment = EnvironmentService.get_environment_by_uri(session, data.get('environmentUri'))
worksheet = WorksheetService.get_worksheet_by_uri(session, data.get('worksheetUri'))
env_group = EnvironmentService.get_environment_group(
session, worksheet.SamlAdminGroupName, environment.environmentUri
)
output_file_key = WorksheetQueryResultService.get_file_key(
env_group.environmentAthenaWorkGroup, data.get('athenaQueryId'), data.get('fileFormat')
)
sql_query_result = WorksheetRepository.find_query_result_by_format(
session, data.get('worksheetUri'), data.get('athenaQueryId'), data.get('fileFormat')
)
if data.get('fileFormat') == 'xlsx':
is_job_failed = WorksheetQueryResultService.handle_xlsx_format(
environment.EnvironmentDefaultBucketName, output_file_key
)
if is_job_failed:
raise ValueError('Error while preparing the xlsx file')

s3_client = S3Client(environment)
if not sql_query_result:
sql_query_result = WorksheetQueryResultService.create_query_result(
sql_query_result = WorksheetQueryResultService._create_query_result(
environment.EnvironmentDefaultBucketName,
env_group.environmentAthenaWorkGroup,
worksheet.worksheetUri,
environment.region,
environment.AwsAccountId,
data,
)
S3_client.object_exists(
region=environment.region, bucket=environment.EnvironmentDefaultBucketName, key=output_file_key
output_file_s3_path = WorksheetQueryResultService.build_s3_file_path(
env_group.environmentAthenaWorkGroup, data.get('athenaQueryId')
)
if sql_query_result.fileFormat == WorksheetResultsFormat.XLSX.value:
try:
csv_data = s3_client.get_object(bucket=environment.EnvironmentDefaultBucketName, key=f'{output_file_s3_path}.{WorksheetResultsFormat.CSV.value}')
excel_buffer = WorksheetQueryResultService.convert_csv_to_xlsx(csv_data)
s3_client.put_object(bucket=environment.EnvironmentDefaultBucketName, key=f'{output_file_s3_path}.{WorksheetResultsFormat.XLSX.value}', body=excel_buffer)
except Exception as e:
raise exceptions.AWSResourceNotAvailable('CONVERT_CSV_TO_EXCEL',f'Failed to convert csv to xlsx: {e}')

s3_client.object_exists(
bucket=environment.EnvironmentDefaultBucketName, key=f'{output_file_s3_path}.{sql_query_result.fileFormat}'
)
if sql_query_result.is_download_link_expired():
url = S3_client.get_presigned_url(
region=environment.region,
url = s3_client.get_presigned_url(
bucket=environment.EnvironmentDefaultBucketName,
key=output_file_key,
expire_minutes=default_timeout,
key=f'{output_file_s3_path}.{sql_query_result.fileFormat}',
expire_minutes=WorksheetQueryResultService._DEFAULT_QUERY_RESULTS_TIMEOUT,
)
sql_query_result.downloadLink = url
sql_query_result.expiresIn = datetime.utcnow() + timedelta(seconds=default_timeout)
sql_query_result.expiresIn = datetime.now(DATETIME_UTC) + timedelta(minutes=WorksheetQueryResultService._DEFAULT_QUERY_RESULTS_TIMEOUT)

session.add(sql_query_result)
session.commit()
Expand Down
Comment thread
dlpzx marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

# revision identifiers, used by Alembic.
revision = 'd1d6da1b2d67'
down_revision = 'd274e756f0ae'
down_revision = 'f87aecc36d39'
branch_labels = None
depends_on = None

Expand All @@ -25,9 +25,16 @@ def upgrade():
op.add_column('worksheet_query_result', sa.Column('expiresIn', sa.DateTime(), nullable=True))
op.add_column('worksheet_query_result', sa.Column('updated', sa.DateTime(), nullable=False))
op.add_column('worksheet_query_result', sa.Column('fileFormat', sa.String(), nullable=True))
op.drop_constraint('AthenaQueryId', 'worksheet_query_result', type_='primary')
op.create_primary_key('worksheet_query_result_pkey', 'worksheet_query_result', ['worksheetQueryResultUri'])
op.alter_column('worksheet_query_result', 'AthenaQueryId', nullable=False)
op.drop_column('worksheet_query_result', 'queryType')


def downgrade():
op.add_column('worksheet_query_result', sa.Column('queryType', sa.VARCHAR(), autoincrement=False, nullable=True))
op.drop_constraint('worksheet_query_result_pkey', 'worksheet_query_result', type_='primary')
op.create_primary_key('AthenaQueryId', 'worksheet_query_result', ['AthenaQueryId'])
op.drop_column('worksheet_query_result', 'fileFormat')
op.drop_column('worksheet_query_result', 'updated')
op.drop_column('worksheet_query_result', 'expiresIn')
Expand Down
Comment thread
dlpzx marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,40 +1,22 @@
import { gql } from 'apollo-boost';

export const createWorksheetQueryResultDownloadUrl = ({
fileFormat,
environmentUri,
athenaQueryId,
worksheetUri
}) => ({
export const createWorksheetQueryResultDownloadUrl = (input) => ({
variables: {
fileFormat,
environmentUri,
athenaQueryId,
worksheetUri
input
},
query: gql`
mutation CreateWorksheetQueryResultDownloadUrl(
$fileFormat: String!
$environmentUri: String!
$athenaQueryId: String!
$worksheetUri: String!
$input: WorksheetQueryResultDownloadUrlInput!
) {
createWorksheetQueryResultDownloadUrl(
input: {
fileFormat: $fileFormat
environmentUri: $environmentUri
athenaQueryId: $athenaQueryId
worksheetUri: $worksheetUri
}
input: $input
) {
downloadLink
AthenaQueryId
expiresIn
fileFormat
OutputLocation
outputLocation
}
}
`
});


Loading