Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions scrapinghub/hubstorage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ def _get_package_version():
return __version__


class _JobQClientProxy:

def __init__(self, client, endpoint):
self.auth = client.auth
self.endpoint = endpoint
self.use_msgpack = False
self.request = client.request


class HubstorageClient(object):

DEFAULT_ENDPOINT = 'https://storage.scrapinghub.com/'
Expand All @@ -60,7 +69,7 @@ class HubstorageClient(object):

def __init__(self, auth=None, endpoint=None, connection_timeout=None,
max_retries=None, max_retry_time=None, user_agent=None,
use_msgpack=True):
use_msgpack=True, *, jobq_endpoint=None):
"""
Note:
max_retries and max_retry_time change how the client attempt to retry failing requests that are
Expand All @@ -78,14 +87,24 @@ def __init__(self, auth=None, endpoint=None, connection_timeout=None,
max_retries (int): The number of time idempotent requests may be retried
max_retry_time (int): The time, in seconds, during which the client can retry a request
use_msgpack (bool): Flag to enable/disable msgpack use for serialization
jobq_endpoint (str, optional): The JobQ API root address.
Keyword-only argument. If not provided, it will be read from
the ``SHUB_JOBQ`` environment variable, or fall back to the
value of ``endpoint``.
"""
self.auth = xauth(auth)
self.endpoint = endpoint or os.getenv("SHUB_STORAGE", self.DEFAULT_ENDPOINT)
self._jobq_endpoint = (
jobq_endpoint or
os.getenv("SHUB_JOBQ") or
self.endpoint
)
self.connection_timeout = connection_timeout or self.DEFAULT_CONNECTION_TIMEOUT_S
self.user_agent = user_agent or self.DEFAULT_USER_AGENT
self.session = self._create_session()
self.retrier = self._create_retrier(max_retries, max_retry_time)
self.jobq = JobQ(self, None)
self._jobq_client = _JobQClientProxy(self, self._jobq_endpoint)
self.jobq = JobQ(self._jobq_client, None)
self.projects = Projects(self, None)
self.root = ResourceType(self, None)
self._batchuploader = None
Expand Down
3 changes: 2 additions & 1 deletion scrapinghub/hubstorage/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def __init__(self, client, key, auth=None, jobauth=None, metadata=None):
self.logs = Logs(client, self.key, self.auth)
self.samples = Samples(client, self.key, self.auth)
self.requests = Requests(client, self.key, self.auth)
self.jobq = JobQ(client, self.key.split('/')[0], auth)
jobq_client = getattr(client, '_jobq_client', client)
self.jobq = JobQ(jobq_client, self.key.split('/')[0], auth)

def close_writers(self):
wl = [self.items, self.logs, self.samples, self.requests]
Expand Down
3 changes: 2 additions & 1 deletion scrapinghub/hubstorage/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def __init__(self, client, projectid, auth=None):
self.items = Items(client, self.projectid, auth=auth)
self.logs = Logs(client, self.projectid, auth=auth)
self.samples = Samples(client, self.projectid, auth=auth)
self.jobq = JobQ(client, self.projectid, auth=auth)
jobq_client = getattr(client, '_jobq_client', client)
self.jobq = JobQ(jobq_client, self.projectid, auth=auth)
self.activity = Activity(client, self.projectid, auth=auth)
self.collections = Collections(client, self.projectid, auth=auth)
self.frontier = Frontier(client, self.projectid, auth=auth)
Expand Down
43 changes: 43 additions & 0 deletions tests/test_jobq_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from scrapinghub import HubstorageClient


def test_hubstorage_jobq_defaults_to_storage_endpoint(monkeypatch):
monkeypatch.delenv('SHUB_JOBQ', raising=False)
client = HubstorageClient(auth='apikey', endpoint='https://storage.example/')
assert client.jobq.url.startswith('https://storage.example/')


def test_hubstorage_jobq_endpoint_uses_env_var(monkeypatch):
monkeypatch.setenv('SHUB_JOBQ', 'https://jobq-internal.zyte.com/')
client = HubstorageClient(auth='apikey', endpoint='https://storage.example/')
assert client.jobq.url.startswith('https://jobq-internal.zyte.com/')


def test_hubstorage_jobq_endpoint_argument(monkeypatch):
monkeypatch.delenv('SHUB_JOBQ', raising=False)
client = HubstorageClient(
auth='apikey',
endpoint='https://storage.example/',
jobq_endpoint='https://jobq.example/',
)
assert client.jobq.url.startswith('https://jobq.example/')


def test_hubstorage_connection_timeout_positional_compatibility(monkeypatch):
monkeypatch.delenv('SHUB_JOBQ', raising=False)
client = HubstorageClient('apikey', 'https://storage.example/', 12)
assert client.connection_timeout == 12
assert client.jobq.url.startswith('https://storage.example/')


def test_project_and_job_jobq_use_configured_endpoint():
client = HubstorageClient(
auth='apikey',
endpoint='https://storage.example/',
jobq_endpoint='https://jobq.example/',
)
project = client.get_project('123')
job = client.get_job('123/1/1')

assert project.jobq.url.startswith('https://jobq.example/')
assert job.jobq.url.startswith('https://jobq.example/')
Loading