Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e-cli/e2e-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": "php",
"test_suites": "basic",
"test_suites": "basic,retry",
"auto_settings": false,
"patch": null,
"env": {}
Expand Down
52 changes: 34 additions & 18 deletions e2e-cli/main.php
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ function parseHost(string $apiHost): string
* Build the options array for Segment\Client.
*
* @param array<string,mixed> $input
* @param array<int,string> &$errors collected error messages
* @return array<string,mixed>
*/
function buildClientOptions(array $input, array &$errors): array
function buildClientOptions(array $input): array
{
$config = $input['config'] ?? [];
$apiHost = $input['apiHost'] ?? '';
Expand All @@ -154,10 +153,11 @@ function buildClientOptions(array $input, array &$errors): array
// mock test server (the base LibCurl hardcodes https://).
'consumer' => E2eLibCurl::class,
'protocol' => $scheme,
'error_handler' => function (int $code, string $message) use (&$errors): void {
$msg = "HTTP {$code}: {$message}";
debugLog('SDK error — ' . $msg);
$errors[] = $msg;
// Log HTTP errors to stderr only — success/failure is determined by
// track()/flush() return values, not by the error_handler callback,
// because handleError fires for transient retry errors too.
'error_handler' => function (int $code, string $message): void {
debugLog("SDK HTTP error {$code}: {$message}");
},
];

Expand All @@ -176,6 +176,11 @@ function buildClientOptions(array $input, array &$errors): array
debugLog('curl_timeout: ' . $options['curl_timeout']);
}

if (isset($config['maxRetries']) && is_numeric($config['maxRetries'])) {
$options['retry_count'] = (int)$config['maxRetries'];
debugLog('retry_count: ' . $options['retry_count']);
}

return $options;
}

Expand Down Expand Up @@ -241,9 +246,10 @@ function buildMessage(array $event): array
}

$errors = [];
$autoFlushFailed = false; // set true if an enqueue() auto-flush returns false

// Build client options (error_handler captures into $errors by reference)
$options = buildClientOptions($input, $errors);
// Build client options (error_handler just logs; we track success via return values)
$options = buildClientOptions($input);

debugLog('Creating Segment\\Client with writeKey=' . substr($writeKey, 0, 4) . '...');

Expand All @@ -268,30 +274,35 @@ function buildMessage(array $event): array

debugLog(" [{$seqIndex}/{$eventIndex}] Enqueueing {$type}");

$enqueueOk = true;
switch ($type) {
case 'track':
$client->track($message);
$enqueueOk = $client->track($message);
break;
case 'identify':
$client->identify($message);
$enqueueOk = $client->identify($message);
break;
case 'page':
$client->page($message);
$enqueueOk = $client->page($message);
break;
case 'screen':
$client->screen($message);
$enqueueOk = $client->screen($message);
break;
case 'alias':
$client->alias($message);
$enqueueOk = $client->alias($message);
break;
case 'group':
$client->group($message);
$enqueueOk = $client->group($message);
break;
default:
$errors[] = "Unknown event type: {$type}";
debugLog(" Unknown event type: {$type}");
break;
}
if (!$enqueueOk) {
$autoFlushFailed = true;
debugLog(" Enqueue/auto-flush failed for {$type}");
}
}
}

Expand All @@ -306,14 +317,19 @@ function buildMessage(array $event): array
$errors[] = 'Flush failed';
}

$hasErrors = !empty($errors);
$success = $flushOk && !$hasErrors;
// Success = all flushes succeeded and no fatal errors.
// auto-flushes (from enqueue when flush_at reached) and explicit flush are both tracked.
$overallSuccess = $flushOk && !$autoFlushFailed && empty($errors);

if ($success) {
if ($overallSuccess) {
outputResult(true, $sentBatches);
exit(0);
} else {
$errorMsg = implode('; ', $errors);
$allErrors = array_merge(
$errors,
$autoFlushFailed ? ['Auto-flush failed'] : []
);
$errorMsg = implode('; ', $allErrors ?: ['Unknown flush failure']);
outputResult(false, $sentBatches, $errorMsg);
exit(1);
}
148 changes: 94 additions & 54 deletions lib/Consumer/LibCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,92 +9,132 @@ class LibCurl extends QueueConsumer
protected string $type = 'LibCurl';

/**
* Make a sync request to our API. If debug is
* enabled, we wait for the response
* and retry once to diminish impact on performance.
* Send a batch of messages to the API with spec-compliant retry logic:
* - 2xx/3xx: success
* - 429 + Retry-After: sleep without consuming retry budget
* - 429 without Retry-After / other retryable (5xx except 501/505/511,
* 408/410/460): exponential backoff, counts against retry budget
* - Non-retryable 4xx / 501/505/511: drop immediately
*
* @param array $messages array of all the messages to send
* @return bool whether the request succeeded
*/
public function flushBatch(array $messages): bool
{
$body = $this->payload($messages);
$body = $this->payload($messages);
$payload = json_encode($body);
$secret = $this->secret;
$secret = $this->secret;

if ($this->compress_request) {
$payload = gzencode($payload);
}

if ($this->host) {
$host = $this->host;
} else {
$host = 'api.segment.io';
}
$path = '/v1/batch';
$url = $this->protocol . $host . $path;
$host = $this->host ?: 'api.segment.io';
$url = $this->protocol . $host . '/v1/batch';

$backoff = 100; // Set initial waiting time to 100ms
$library = $messages[0]['context']['library'];
$userAgent = $library['name'] . '/' . $library['version'];

while ($backoff < $this->maximum_backoff_duration) {
// open connection
$ch = curl_init();
$backoffMs = 500; // base 500ms per e2e spec
$backoffCapMs = 60000; // cap 60s
$retriesRemaining = $this->retry_count;
$attempt = 0;
$backoffStartTime = null;
$rateLimitStartTime = null;

// set the url, number of POST vars, POST data
curl_setopt($ch, CURLOPT_USERPWD, $secret . ':');
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_TIMEOUT, $this->curl_timeout);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->curl_connecttimeout);
while (true) {
$attempt++;
$responseHeaders = [];

$ch = curl_init();

// set variables for headers
$header = [];
$header[] = 'Content-Type: application/json';
$headers = [
'Content-Type: application/json',
'User-Agent: ' . $userAgent,
];

if ($this->compress_request) {
$header[] = 'Content-Encoding: gzip';
$headers[] = 'Content-Encoding: gzip';
}

// Send user agent in the form of {library_name}/{library_version} as per RFC 7231.
$library = $messages[0]['context']['library'];
$libName = $library['name'];
$libVersion = $library['version'];
$header[] = "User-Agent: $libName/$libVersion";
if ($attempt > 1) {
$headers[] = 'X-Retry-Count: ' . ($attempt - 1);
}

curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_USERPWD, $secret . ':');
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_TIMEOUT, $this->curl_timeout);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->curl_connecttimeout);
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HEADERFUNCTION, function ($curl, $header) use (&$responseHeaders) {
$parts = explode(':', $header, 2);
if (count($parts) === 2) {
$responseHeaders[strtolower(trim($parts[0]))] = trim($parts[1]);
}

return strlen($header);
});

// retry failed requests just once to diminish impact on performance
$responseContent = curl_exec($ch);
$err = curl_error($ch);
$responseCode = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);

$err = curl_error($ch);
if ($err) {
$this->handleError(curl_errno($ch), $err);
$this->handleError(0, $err);

return false;
}

$responseCode = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
// 2xx and 3xx are success
if ($responseCode >= 200 && $responseCode < 400) {
return true;
}

//close connection
curl_close($ch);
$this->handleError($responseCode, $responseContent);

// 429: check for Retry-After header first
if ($responseCode === 429) {
$retryAfterS = $this->parseRetryAfter($responseHeaders['retry-after'] ?? null);

if ($retryAfterS !== null) {
if ($rateLimitStartTime === null) {
$rateLimitStartTime = microtime(true);
}

if ($responseCode !== 200) {
// log error
$this->handleError($responseCode, $responseContent);

if (($responseCode >= 500 && $responseCode <= 600) || $responseCode === 429) {
// If status code is greater than 500 and less than 600, it indicates server error
// Error code 429 indicates rate limited.
// Retry uploading in these cases.
usleep($backoff * 1000);
$backoff *= 2;
} elseif ($responseCode >= 400) {
break;
if ((microtime(true) - $rateLimitStartTime) * 1000 >= $this->max_rate_limit_duration_ms) {
return false;
}

$sleepMs = min($retryAfterS * 1000, $this->rate_limit_retry_after_cap_s * 1000);
usleep($sleepMs * 1000);
continue; // Do NOT decrement retriesRemaining
}
} else {
break; // no error
// No Retry-After: fall through to counted backoff
}
}

return true;
if (!$this->isRetryable($responseCode)) {
return false;
}

$retriesRemaining--;

if ($retriesRemaining <= 0) {
return false;
}

if ($backoffStartTime === null) {
$backoffStartTime = microtime(true);
}

if ((microtime(true) - $backoffStartTime) * 1000 >= $this->max_total_backoff_duration_ms) {
return false;
}

usleep($backoffMs * 1000);
$backoffMs = min($backoffMs * 2, $backoffCapMs);
}
}
}
Loading