Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## [1.6.4]

### Enhancements

- **feat(slack): support file attachments and OAuth refresh tokens.** Slack indexing now emits file attachment records, downloading uses Slack private file URLs with bearer authentication, and `SlackAccessConfig` accepts `refresh_token` so platform plugin schemas can expose OAuth token rotation settings.

### Fixes

- **fix(slack): guard private file downloads.** Validate Slack private download URLs before sending bearer credentials, refuse redirects that could forward bearer credentials, stream private file downloads to disk, and use a bounded timeout for private file reads.

## [1.6.3]

### Enhancements
Expand Down
166 changes: 166 additions & 0 deletions test/unit/connectors/test_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from unittest.mock import Mock

import pytest

from unstructured_ingest.data_types.file_data import (
FileData,
FileDataSourceMetadata,
SourceIdentifiers,
)
from unstructured_ingest.error import ValueError as IngestValueError
from unstructured_ingest.processes.connectors.slack import (
PRIVATE_FILE_DOWNLOAD_TIMEOUT_SECONDS,
SlackAccessConfig,
SlackDownloader,
SlackIndexer,
SlackIndexerConfig,
_NoRedirectHandler,
)


def test_slack_access_config_accepts_refresh_token():
config = SlackAccessConfig(token="xoxb-slack-token", refresh_token="xoxe-slack-refresh-token")

assert config.token == "xoxb-slack-token"
assert config.refresh_token == "xoxe-slack-refresh-token"


def test_slack_indexer_emits_file_data_for_message_files():
client = Mock()
client.conversations_history.return_value = [
{
"messages": [
{
"ts": "1710000000.000100",
"text": "Here is the report",
"files": [
{
"id": "F123",
"name": "report.pdf",
"url_private_download": "https://files.slack.com/report.pdf",
}
],
}
]
}
]
connection_config = Mock()
connection_config.get_client.return_value = client
indexer = SlackIndexer(
index_config=SlackIndexerConfig(channels=["C123"]),
connection_config=connection_config,
)

file_data = list(indexer.run())

assert len(file_data) == 2
assert file_data[0].source_identifiers.filename.endswith(".xml")
slack_file = file_data[1]
assert slack_file.source_identifiers.filename == "F123-report.pdf"
assert slack_file.metadata.record_locator == {
"type": "file",
"channel": "C123",
"message_ts": "1710000000.000100",
"file_id": "F123",
}


@pytest.mark.asyncio
async def test_slack_downloader_downloads_file_attachment(tmp_path, mocker):
async_client = Mock()
async_client.files_info = mocker.AsyncMock(
return_value={
"ok": True,
"file": {
"id": "F123",
"name": "report.pdf",
"url_private_download": "https://files.slack.com/report.pdf",
},
}
)
connection_config = Mock()
connection_config.get_async_client.return_value = async_client
connection_config.access_config.get_secret_value.return_value.token = "xoxb-slack-token"
downloader = SlackDownloader(connection_config=connection_config)
file_data = FileData(
identifier="F123",
connector_type="slack",
source_identifiers=SourceIdentifiers(
filename="F123-report.pdf",
fullpath="F123-report.pdf",
),
metadata=FileDataSourceMetadata(
record_locator={
"type": "file",
"channel": "C123",
"message_ts": "1710000000.000100",
"file_id": "F123",
}
),
)
response = Mock()
response.__enter__ = Mock(return_value=response)
response.__exit__ = Mock(return_value=None)
response.read.side_effect = [b"pdf ", b"bytes", b""]
opener = Mock()
opener.open.return_value = response
build_opener = mocker.patch("urllib.request.build_opener", return_value=opener)

await downloader._download_file(file_data, tmp_path / "F123-report.pdf")

assert (tmp_path / "F123-report.pdf").read_bytes() == b"pdf bytes"
build_opener.assert_called_once_with(_NoRedirectHandler)
opener.open.assert_called_once()
request = opener.open.call_args.args[0]
assert request.full_url == "https://files.slack.com/report.pdf"
assert request.headers["Authorization"] == "Bearer xoxb-slack-token"
assert opener.open.call_args.kwargs["timeout"] == PRIVATE_FILE_DOWNLOAD_TIMEOUT_SECONDS
assert response.read.call_count > 1


def test_slack_private_file_download_rejects_redirects():
handler = _NoRedirectHandler()

with pytest.raises(IngestValueError, match="redirected"):
handler.redirect_request(None, None, 302, "Found", {}, "https://example.com/report.pdf")


@pytest.mark.asyncio
async def test_slack_downloader_rejects_non_slack_private_download_url(tmp_path, mocker):
async_client = Mock()
async_client.files_info = mocker.AsyncMock(
return_value={
"ok": True,
"file": {
"id": "F123",
"name": "report.pdf",
"url_private_download": "https://example.com/report.pdf",
},
}
)
connection_config = Mock()
connection_config.get_async_client.return_value = async_client
connection_config.access_config.get_secret_value.return_value.token = "xoxb-slack-token"
downloader = SlackDownloader(connection_config=connection_config)
file_data = FileData(
identifier="F123",
connector_type="slack",
source_identifiers=SourceIdentifiers(
filename="F123-report.pdf",
fullpath="F123-report.pdf",
),
metadata=FileDataSourceMetadata(
record_locator={
"type": "file",
"channel": "C123",
"message_ts": "1710000000.000100",
"file_id": "F123",
}
),
)
build_opener = mocker.patch("urllib.request.build_opener")

with pytest.raises(IngestValueError, match="files.slack.com"):
await downloader._download_file(file_data, tmp_path / "F123-report.pdf")

build_opener.assert_not_called()
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6.3" # pragma: no cover
__version__ = "1.6.4" # pragma: no cover
126 changes: 125 additions & 1 deletion unstructured_ingest/processes/connectors/slack.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import asyncio
import builtins
import hashlib
import re
import shutil
import time
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from datetime import datetime
Expand Down Expand Up @@ -34,15 +40,51 @@
# NOTE: Pagination limit set to the upper end of the recommended range
# https://api.slack.com/apis/pagination#facts
PAGINATION_LIMIT = 200
PRIVATE_FILE_DOWNLOAD_TIMEOUT_SECONDS = 60
SLACK_PRIVATE_FILE_HOST = "files.slack.com"

CONNECTOR_TYPE = "slack"


def _safe_slack_filename(filename: str) -> str:
sanitized = re.sub(r"[/\\]+", "_", filename).strip()
return sanitized or "slack-file"


def _validate_private_download_url(download_url: str) -> str:
parsed_url = urllib.parse.urlparse(download_url)
hostname = parsed_url.hostname.lower() if parsed_url.hostname else None

if parsed_url.scheme != "https" or hostname != SLACK_PRIVATE_FILE_HOST:
raise ValueError("Slack file download URL must be an HTTPS files.slack.com URL.")

if parsed_url.username or parsed_url.password:
raise ValueError("Slack file download URL must not include credentials.")

try:
port = parsed_url.port
except builtins.ValueError as exc:
raise ValueError("Slack file download URL has an invalid port.") from exc

if port not in (None, 443):
raise ValueError("Slack file download URL must use the default HTTPS port.")

return download_url


class _NoRedirectHandler(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl): # noqa: D102, ANN001
raise ValueError(
"Slack file download redirected; refusing to forward bearer authorization."
)


class SlackAccessConfig(AccessConfig):
token: str = Field(
description="Bot token used to access Slack API, must have channels:history scope for the"
" bot user."
)
refresh_token: Optional[str] = Field(default=None, description="Slack OAuth refresh token.")


class SlackConnectionConfig(ConnectionConfig):
Expand Down Expand Up @@ -109,6 +151,8 @@ def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
messages = conversation_history.get("messages", [])
if messages:
yield self._messages_to_file_data(messages, channel)
for file_data in self._message_files_to_file_data(messages, channel):
yield file_data

def _messages_to_file_data(
self,
Expand Down Expand Up @@ -142,6 +186,44 @@ def _messages_to_file_data(
display_name=source_identifiers.fullpath,
)

def _message_files_to_file_data(
self,
messages: list[dict],
channel: str,
) -> Generator[FileData, None, None]:
for message in messages:
message_ts = message.get("ts")
for slack_file in message.get("files", []) or []:
file_id = slack_file.get("id")
if not file_id or not message_ts:
continue

filename = _safe_slack_filename(
f"{file_id}-{slack_file.get('name') or slack_file.get('title') or file_id}"
)
identifier_base = f"{channel}-{message_ts}-{file_id}"
identifier = hashlib.sha256(identifier_base.encode("utf-8")).hexdigest()
source_identifiers = SourceIdentifiers(filename=filename, fullpath=filename)
yield FileData(
identifier=identifier,
connector_type=CONNECTOR_TYPE,
source_identifiers=source_identifiers,
metadata=FileDataSourceMetadata(
date_created=(
str(slack_file.get("created")) if slack_file.get("created") else None
),
date_modified=message_ts,
date_processed=str(time.time()),
record_locator={
"type": "file",
"channel": channel,
"message_ts": message_ts,
"file_id": file_id,
},
),
display_name=source_identifiers.fullpath,
)

@SourceConnectionError.wrap
def precheck(self) -> None:
client = self.connection_config.get_client()
Expand Down Expand Up @@ -172,7 +254,13 @@ async def run_async(self, file_data: FileData, **kwargs) -> DownloadResponse:
)
raise ValueError("Generated invalid download path.")

await self._download_conversation(file_data, download_path)
if (
file_data.metadata.record_locator
and file_data.metadata.record_locator.get("type") == "file"
):
await self._download_file(file_data, download_path)
else:
await self._download_conversation(file_data, download_path)
return self.generate_download_response(file_data, download_path)

def is_async(self):
Expand Down Expand Up @@ -224,6 +312,42 @@ async def _download_conversation(self, file_data: FileData, download_path: Path)
download_path.parent.mkdir(exist_ok=True, parents=True)
conversation_xml.write(download_path, encoding="utf-8", xml_declaration=True)

async def _download_file(self, file_data: FileData, download_path: Path) -> None:
record_locator = file_data.metadata.record_locator
if record_locator is None or "file_id" not in record_locator:
logger.error(f"Invalid file record locator in metadata: {record_locator}.")
raise ValueError("Invalid file record locator.")

client = self.connection_config.get_async_client()
file_info = await client.files_info(file=record_locator["file_id"])
if not file_info.get("ok", True):
raise ValueError(f"Slack files.info failed: {file_info.get('error')}")

slack_file = file_info.get("file", {})
download_url = slack_file.get("url_private_download") or record_locator.get(
"url_private_download"
)
if not download_url:
raise ValueError("Slack file is missing url_private_download.")
download_url = _validate_private_download_url(download_url)

token = self.connection_config.access_config.get_secret_value().token
request = urllib.request.Request(
Comment thread
mateuszkuprowski marked this conversation as resolved.
download_url,
headers={"Authorization": f"Bearer {token}"},
Comment thread
mateuszkuprowski marked this conversation as resolved.
)
download_path.parent.mkdir(exist_ok=True, parents=True)
await asyncio.to_thread(self._download_private_file, request, download_path)

@staticmethod
def _download_private_file(request: urllib.request.Request, download_path: Path) -> None:
opener = urllib.request.build_opener(_NoRedirectHandler)
with opener.open(
request,
timeout=PRIVATE_FILE_DOWNLOAD_TIMEOUT_SECONDS,
) as response, download_path.open("wb") as output_file:
shutil.copyfileobj(response, output_file)

def _conversation_to_xml(self, conversation: list[list[dict]]) -> ET.ElementTree:
root = ET.Element("messages")

Expand Down
Loading