Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"name": "issues",
"json_schema": {},
"source_defined_cursor": true,
"default_cursor_field": ["created_at"],
"default_cursor_field": ["updated_at"],
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"issues": {
"created_at": "2024-01-01T00:00:00Z"
"updated_at": "2024-01-01T00:00:00Z"
}
}
54 changes: 42 additions & 12 deletions airbyte-integrations/connectors/source-pylon/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,26 @@ definitions:
type: SimpleRetriever
requester:
$ref: "#/definitions/base_requester"
path: issues
http_method: GET
path: issues/search
http_method: POST
request_body_json:
filter: |
{
"operator": "and",
"field": "updated_at",
"subfilters": [
{
"field": "updated_at",
"operator": "time_is_after",
"value": "{{ stream_interval.start_time }}"
},
{
"field": "updated_at",
"operator": "time_is_before",
"value": "{{ stream_interval.end_time }}"
}
]
}
error_handler:
$ref: "#/definitions/error_handler"
record_selector:
Expand All @@ -86,10 +104,10 @@ definitions:
field_path:
- data
paginator:
$ref: "#/definitions/cursor_paginator"
$ref: "#/definitions/issues_paginator"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: created_at
cursor_field: updated_at
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S.%fZ"
- "%Y-%m-%dT%H:%M:%SZ"
Expand All @@ -104,14 +122,6 @@ definitions:
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
step: P30D
cursor_granularity: PT1S
start_time_option:
type: RequestOption
inject_into: request_parameter
field_name: start_time
end_time_option:
type: RequestOption
inject_into: request_parameter
field_name: end_time
schema_loader:
type: InlineSchemaLoader
schema:
Expand Down Expand Up @@ -500,6 +510,22 @@ definitions:
cursor_value: "{{ response.get('pagination', {}).get('cursor', '') }}"
stop_condition: "{{ not response.get('pagination', {}).get('has_next_page', False) }}"

issues_paginator:
type: DefaultPaginator
page_token_option:
type: RequestOption
inject_into: body_json
field_name: cursor
page_size_option:
type: RequestOption
inject_into: body_json
field_name: limit
pagination_strategy:
type: CursorPagination
page_size: 999
cursor_value: "{{ response.get('pagination', {}).get('cursor', '') }}"
stop_condition: "{{ not response.get('pagination', {}).get('has_next_page', False) }}"

streams:
- $ref: "#/definitions/streams/accounts"
- $ref: "#/definitions/streams/contacts"
Expand Down Expand Up @@ -724,6 +750,10 @@ schemas:
type:
- "null"
- string
updated_at:
type:
- "null"
- string
first_response_time:
type:
- "null"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-pylon/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: f2e53e88-3c6b-4e5a-b7c2-a1d9c5e8f4b6
dockerImageTag: 0.0.5
dockerImageTag: 0.0.6
dockerRepository: airbyte/source-pylon
githubIssueLabel: source-pylon
icon: icon.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.

"""
Unit tests for source-pylon manifest configuration.

Validates that the issues stream correctly uses POST /issues/search
with updated_at cursor field and body_json injection for pagination.
"""

from pathlib import Path

import pytest
import yaml


MANIFEST_PATH = Path(__file__).parent.parent / "manifest.yaml"


@pytest.fixture()
def manifest():
with open(MANIFEST_PATH) as f:
return yaml.safe_load(f)


# ---------------------------------------------------------------------------
# Issues stream: endpoint and HTTP method
# ---------------------------------------------------------------------------


def test_issues_stream_uses_post_method(manifest):
"""The issues stream must use POST /issues/search instead of GET /issues."""
issues = manifest["definitions"]["streams"]["issues"]
requester = issues["retriever"]["requester"]
assert requester["http_method"] == "POST"
assert requester["path"] == "issues/search"


# ---------------------------------------------------------------------------
# Issues stream: cursor field
# ---------------------------------------------------------------------------


def test_issues_stream_cursor_field_is_updated_at(manifest):
"""The issues stream must use updated_at as cursor field."""
issues = manifest["definitions"]["streams"]["issues"]
assert issues["incremental_sync"]["cursor_field"] == "updated_at"


# ---------------------------------------------------------------------------
# Issues stream: request body filter
# ---------------------------------------------------------------------------


def test_issues_stream_has_request_body_json_filter(manifest):
"""The issues stream must send a filter in the request body."""
issues = manifest["definitions"]["streams"]["issues"]
requester = issues["retriever"]["requester"]
assert "request_body_json" in requester
assert "filter" in requester["request_body_json"]


def test_issues_stream_filter_uses_updated_at(manifest):
"""The request body filter must reference the updated_at field."""
issues = manifest["definitions"]["streams"]["issues"]
filter_template = issues["retriever"]["requester"]["request_body_json"]["filter"]
assert "updated_at" in filter_template
assert "time_is_after" in filter_template
assert "time_is_before" in filter_template


def test_issues_stream_filter_references_stream_interval(manifest):
"""The filter must use stream_interval for start/end times."""
issues = manifest["definitions"]["streams"]["issues"]
filter_template = issues["retriever"]["requester"]["request_body_json"]["filter"]
assert "stream_interval.start_time" in filter_template
assert "stream_interval.end_time" in filter_template


def test_issues_stream_filter_has_and_operator(manifest):
"""The filter must use the 'and' operator to combine time conditions."""
issues = manifest["definitions"]["streams"]["issues"]
filter_template = issues["retriever"]["requester"]["request_body_json"]["filter"]
assert '"operator": "and"' in filter_template


def test_issues_stream_no_start_end_time_options(manifest):
"""The issues stream must NOT have start_time_option/end_time_option."""
issues = manifest["definitions"]["streams"]["issues"]
incremental_sync = issues["incremental_sync"]
assert "start_time_option" not in incremental_sync
assert "end_time_option" not in incremental_sync


# ---------------------------------------------------------------------------
# Issues paginator: body_json injection
# ---------------------------------------------------------------------------


def test_issues_paginator_uses_body_json(manifest):
"""The issues paginator must inject cursor and limit into the request body."""
paginator = manifest["definitions"]["issues_paginator"]
assert paginator["page_token_option"]["inject_into"] == "body_json"
assert paginator["page_token_option"]["field_name"] == "cursor"
assert paginator["page_size_option"]["inject_into"] == "body_json"
assert paginator["page_size_option"]["field_name"] == "limit"


def test_issues_paginator_strategy(manifest):
"""The issues paginator must use CursorPagination with page_size 999."""
paginator = manifest["definitions"]["issues_paginator"]
strategy = paginator["pagination_strategy"]
assert strategy["type"] == "CursorPagination"
assert strategy["page_size"] == 999


# ---------------------------------------------------------------------------
# Issues schema: updated_at field
# ---------------------------------------------------------------------------


def test_issues_schema_has_updated_at(manifest):
"""The issues schema must include the updated_at field."""
schema = manifest["schemas"]["issues"]
assert "updated_at" in schema["properties"]
assert schema["properties"]["updated_at"]["type"] == ["null", "string"]


def test_issues_schema_retains_created_at(manifest):
"""The issues schema must still include the created_at field."""
schema = manifest["schemas"]["issues"]
assert "created_at" in schema["properties"]


# ---------------------------------------------------------------------------
# Non-issues streams: unchanged
# ---------------------------------------------------------------------------


@pytest.mark.parametrize(
"stream_name",
[
pytest.param("accounts", id="accounts"),
pytest.param("contacts", id="contacts"),
pytest.param("tags", id="tags"),
pytest.param("teams", id="teams"),
pytest.param("users", id="users"),
],
)
def test_other_streams_still_use_get(manifest, stream_name):
"""Non-issues streams must still use GET."""
stream = manifest["definitions"]["streams"][stream_name]
requester = stream["retriever"]["requester"]
assert requester["http_method"] == "GET", f"{stream_name} should still use GET"


def test_cursor_paginator_still_uses_request_parameter(manifest):
"""The shared cursor_paginator (for GET streams) must still use request_parameter."""
paginator = manifest["definitions"]["cursor_paginator"]
assert paginator["page_token_option"]["inject_into"] == "request_parameter"


# ---------------------------------------------------------------------------
# Substream compatibility: issue_messages and issue_threads
# ---------------------------------------------------------------------------


def test_issue_messages_uses_issues_parent(manifest):
"""issue_messages must still use issues as parent stream."""
stream = manifest["definitions"]["streams"]["issue_messages"]
partition_router = stream["retriever"]["partition_router"]
assert partition_router["parent_stream_configs"][0]["stream"]["$ref"] == "#/definitions/streams/issues"


def test_issue_threads_uses_issues_parent(manifest):
"""issue_threads must still use issues as parent stream."""
stream = manifest["definitions"]["streams"]["issue_threads"]
partition_router = stream["retriever"]["partition_router"]
assert partition_router["parent_stream_configs"][0]["stream"]["$ref"] == "#/definitions/streams/issues"
3 changes: 2 additions & 1 deletion docs/integrations/sources/pylon.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ The Pylon source connector supports the following streams:

### Stream notes

- **Issues** is the only incremental stream. It uses `created_at` as its cursor field and syncs data in 30-day windows, as required by the Pylon API.
- **Issues** is the only incremental stream. It uses `updated_at` as its cursor field and syncs data in 30-day windows via the `POST /issues/search` endpoint. This ensures that incremental syncs capture both newly created and recently modified issues.
- **Issue Messages** and **Issue Threads** are child streams of Issues. They retrieve data for each issue returned by the Issues stream, so the Start Date configuration indirectly affects these streams.
- **Knowledge Base Articles** is a child stream of Knowledge Bases.
- **Custom Fields** queries the Pylon API once for each of the three supported object types: account, issue, and contact.
Expand All @@ -73,6 +73,7 @@ The Pylon API enforces per-endpoint rate limits. The Issues endpoint allows 10 r

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :----------------------------- |
| 0.0.6 | 2026-04-03 | [76085](https://github.com/airbytehq/airbyte/pull/76085) | Switch issues stream to POST /issues/search with updated_at cursor for proper incremental sync support |
| 0.0.5 | 2026-03-31 | [75855](https://github.com/airbytehq/airbyte/pull/75855) | Update dependencies |
| 0.0.4 | 2026-03-17 | [74921](https://github.com/airbytehq/airbyte/pull/74921) | Update dependencies |
| 0.0.3 | 2026-02-24 | [73850](https://github.com/airbytehq/airbyte/pull/73850) | Update dependencies |
Expand Down
Loading