Skip to content

Commit 24bd060

Browse files
authored
chore(gcp chronicle sink): remove timberio/chronicle-emulator (#25670)
1 parent 7d8c120 commit 24bd060

5 files changed

Lines changed: 83 additions & 145 deletions

File tree

Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -994,7 +994,6 @@ all-integration-tests = [
994994
"aws-integration-tests",
995995
"axiom-integration-tests",
996996
"azure-integration-tests",
997-
"chronicle-integration-tests",
998997
"clickhouse-integration-tests",
999998
"databend-integration-tests",
1000999
"datadog-agent-integration-tests",
@@ -1065,7 +1064,6 @@ aws-sns-integration-tests = ["sinks-aws_sns"]
10651064
axiom-integration-tests = ["sinks-axiom"]
10661065
azure-blob-integration-tests = ["sinks-azure_blob"]
10671066
azure-logs-ingestion-integration-tests = ["sinks-azure_logs_ingestion"]
1068-
chronicle-integration-tests = ["sinks-gcp"]
10691067
clickhouse-integration-tests = ["sinks-clickhouse"]
10701068
databend-integration-tests = ["sinks-databend"]
10711069
datadog-agent-integration-tests = ["sources-datadog_agent"]

src/sinks/gcp_chronicle/chronicle_unstructured.rs

Lines changed: 45 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -670,136 +670,59 @@ impl Service<ChronicleRequest> for ChronicleService {
670670
}
671671
}
672672

673-
#[cfg(all(test, feature = "chronicle-integration-tests"))]
674-
mod integration_tests {
675-
use reqwest::{Client, Method, Response};
676-
use serde::{Deserialize, Serialize};
677-
use vector_lib::event::{BatchNotifier, BatchStatus};
673+
#[cfg(test)]
674+
mod unit_tests {
675+
use std::io::Write;
678676

679-
use super::*;
680-
use crate::test_util::{
681-
components::{
682-
COMPONENT_ERROR_TAGS, SINK_TAGS, run_and_assert_sink_compliance,
683-
run_and_assert_sink_error,
684-
},
685-
random_events_with_stream, random_string, trace_init,
686-
};
687-
688-
const ADDRESS_ENV_VAR: &str = "CHRONICLE_ADDRESS";
689-
690-
fn config(log_type: &str, auth_path: &str) -> ChronicleUnstructuredConfig {
691-
let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
692-
let config = format!(
693-
indoc! { r#"
694-
endpoint: "{}"
695-
customer_id: "customer id"
696-
namespace: namespace
697-
credentials_path: "{}"
698-
log_type: "{}"
699-
encoding:
700-
codec: text
701-
"# },
702-
address, auth_path, log_type
703-
);
677+
use tempfile::NamedTempFile;
678+
use wiremock::{Mock, MockServer, ResponseTemplate, matchers::method};
704679

705-
let config: ChronicleUnstructuredConfig = serde_yaml::from_str(&config).unwrap();
706-
config
707-
}
708-
709-
async fn config_build(
710-
log_type: &str,
711-
auth_path: &str,
712-
) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
713-
let cx = SinkContext::default();
714-
config(log_type, auth_path).build(cx).await
715-
}
680+
use super::*;
681+
use crate::test_util::{random_string, trace_init};
716682

717-
#[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
718683
#[tokio::test]
719-
async fn publish_events() {
684+
async fn invalid_credentials_rejected_by_oauth_server() {
720685
trace_init();
721686

722-
let log_type = random_string(10);
723-
let (sink, healthcheck) = config_build(&log_type, "tests/integration/gcp/config/auth.json")
724-
.await
725-
.expect("Building sink failed");
726-
727-
healthcheck.await.expect("Health check failed");
728-
729-
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
730-
let (input, events) = random_events_with_stream(100, 100, Some(batch));
731-
run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;
732-
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
733-
734-
let response = pull_messages(&log_type).await;
735-
let messages = response
736-
.into_iter()
737-
.map(|message| message.log_text)
738-
.collect::<Vec<_>>();
739-
assert_eq!(input.len(), messages.len());
740-
for i in 0..input.len() {
741-
let data = serde_json::to_value(&messages[i]).unwrap();
742-
let expected = serde_json::to_value(input[i].as_log().get("message").unwrap()).unwrap();
743-
assert_eq!(data, expected);
744-
}
745-
}
687+
let mock_server = MockServer::start().await;
688+
Mock::given(method("POST"))
689+
.respond_with(ResponseTemplate::new(401))
690+
.mount(&mock_server)
691+
.await;
692+
693+
let base_url = mock_server.uri();
694+
let creds = serde_json::json!({
695+
"type": "service_account",
696+
"project_id": "test",
697+
"private_key_id": "1",
698+
"private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQDouHdVDVz0/M6PGe60Kf/g0nyOxCvbZgiUAZNzFimXDU+RpZ54\n6/oETl6VpRkbp8a4Xb8avll2lsamdHvGcsgnjJXdpp7LfWYLqHEpn0/XFM+womXg\nvglWCDwAsXmrmwpZKEC82mmyFigheyPA/sfuN6z+wa7P5B65xzIdDQX7nQIDAQAB\nAoGBANID/rUDrTrtll8v8Oon6OH0MjIIuOdzKhSfY3h9rKTDf2YaB2xq0KLoMpVr\ne8AoZb5l45t34naR1M3M2xKY7SSDAVJFfg/3Vxeot86DQ23IGLXj7LnNxXnvklXa\nEXaD8LNz/MXxS7/Lu0R+lEtjEkf23+BRb11fL6Q/EDToNHnhAkEA/FnwHhKMc/Bm\nXsS8bENuZP3SV2v7TU6MFTtXJFmsoZBxHnsM8UUi0gq9gBnApmdhy7v2N/Mv9gFI\nviSdr7vm1QJBAOwV3cHAciRHVK71TweOWIJKZBM9ZVut0VDs5GrBYZxGMBiOr3BI\ns7+0ugTKxVimuei6c0KNXw1kg3Vtc5+utakCQQDklAbXBpAomJHxt5zBKBc/7VXx\nEANyk/p5ZOXbLEsdkXuVU3p2tNwEi+v4s9r4H97Kr3goV+SSnbkpWntm6fn9AkBn\nFnE7rlXpA4C12QYGTaDWW7dxM0j0DGUvChH/j6uYuok73+o5hHWAy2DCwOwFduAN\nAIVd1S9hQLeqaf2oB3jpAkEAnRT+bAlMjtUOBO6XPNO4IbYwWJvGMcIEO7zu6AdB\nPJy3/U+bLimxFuYdrs6SnIHIUVdl35AlckHqzT54a5YKqQ==\n-----END RSA PRIVATE KEY-----",
699+
"client_email": "test@test.com",
700+
"client_id": "1",
701+
"auth_uri": format!("{base_url}/o/oauth2/auth"),
702+
"token_uri": format!("{base_url}/token"),
703+
"auth_provider_x509_cert_url": format!("{base_url}/oauth2/v1/certs"),
704+
"client_x509_cert_url": "https://example.com"
705+
});
746706

747-
#[tokio::test]
748-
async fn invalid_credentials() {
749-
trace_init();
707+
let mut tmp = NamedTempFile::new().unwrap();
708+
write!(tmp, "{creds}").unwrap();
750709

751710
let log_type = random_string(10);
752-
// Test with an auth file that doesnt match the public key sent to the dummy chronicle server.
753-
let sink = config_build(&log_type, "tests/integration/gcp/config/invalidauth.json").await;
754-
755-
assert!(sink.is_err())
756-
}
757-
758-
#[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
759-
#[tokio::test]
760-
async fn publish_invalid_events() {
761-
trace_init();
762-
763-
// The chronicle-emulator we are testing against is setup so a `log_type` of "INVALID"
764-
// will return a `400 BAD_REQUEST`.
765-
let log_type = "INVALID";
766-
let (sink, healthcheck) = config_build(log_type, "tests/integration/gcp/config/auth.json")
767-
.await
768-
.expect("Building sink failed");
769-
770-
healthcheck.await.expect("Health check failed");
771-
772-
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
773-
let (_input, events) = random_events_with_stream(100, 100, Some(batch));
774-
run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
775-
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
776-
}
777-
778-
#[derive(Clone, Debug, Deserialize, Serialize)]
779-
pub struct Log {
780-
customer_id: String,
781-
namespace: String,
782-
log_type: String,
783-
log_text: String,
784-
ts_rfc3339: String,
785-
}
786-
787-
async fn request(method: Method, path: &str, log_type: &str) -> Response {
788-
let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
789-
let url = format!("{address}/{path}");
790-
Client::new()
791-
.request(method.clone(), &url)
792-
.query(&[("log_type", log_type)])
793-
.send()
794-
.await
795-
.unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
796-
}
797-
798-
async fn pull_messages(log_type: &str) -> Vec<Log> {
799-
request(Method::GET, "logs", log_type)
800-
.await
801-
.json::<Vec<Log>>()
802-
.await
803-
.expect("Extracting pull data failed")
711+
let cx = SinkContext::default();
712+
// Normalize to forward slashes so YAML doesn't interpret Windows path separators as escapes.
713+
let creds_path = tmp.path().to_str().unwrap().replace('\\', "/");
714+
let config: ChronicleUnstructuredConfig = serde_yaml::from_str(&format!(
715+
indoc! { r#"
716+
endpoint: "http://127.0.0.1:1"
717+
customer_id: test-customer
718+
credentials_path: "{}"
719+
log_type: "{}"
720+
encoding:
721+
codec: text
722+
"# },
723+
creds_path, log_type
724+
))
725+
.unwrap();
726+
assert!(config.build(cx).await.is_err());
804727
}
805728
}

src/sources/util/http/encoding.rs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
11
use std::{io::Read, sync::OnceLock};
22

3-
use bytes::{Buf, BufMut, Bytes, BytesMut};
3+
use bytes::{Buf, Bytes};
4+
#[cfg(any(
5+
feature = "sources-utils-http-prelude",
6+
feature = "sources-opentelemetry",
7+
test
8+
))]
9+
use bytes::{BufMut, BytesMut};
410
use flate2::read::{MultiGzDecoder, ZlibDecoder};
11+
#[cfg(any(
12+
feature = "sources-utils-http-prelude",
13+
feature = "sources-opentelemetry",
14+
test
15+
))]
516
use futures_util::StreamExt;
617
use snap::raw::Decoder as SnappyDecoder;
7-
use warp::{Filter, filters::BoxedFilter, http::StatusCode};
18+
use warp::http::StatusCode;
19+
#[cfg(any(
20+
feature = "sources-utils-http-prelude",
21+
feature = "sources-opentelemetry"
22+
))]
23+
use warp::{Filter, filters::BoxedFilter};
824

925
use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
1026

@@ -30,6 +46,10 @@ pub(crate) fn max_decompressed_size_bytes() -> usize {
3046
}
3147

3248
/// Collects a request body into [`Bytes`] while enforcing an in-memory size cap.
49+
#[cfg(any(
50+
feature = "sources-utils-http-prelude",
51+
feature = "sources-opentelemetry"
52+
))]
3353
pub(crate) fn limited_body(max_body_size: usize) -> BoxedFilter<(Bytes,)> {
3454
let max_body_size_header = u64::try_from(max_body_size).unwrap_or(u64::MAX);
3555

@@ -158,6 +178,11 @@ fn decompress_snappy(
158178
Ok(decoded.into())
159179
}
160180

181+
#[cfg(any(
182+
feature = "sources-utils-http-prelude",
183+
feature = "sources-opentelemetry",
184+
test
185+
))]
161186
async fn collect_body_with_limit<S, B>(body: S, max_body_size: usize) -> Result<Bytes, ErrorMessage>
162187
where
163188
S: futures_util::Stream<Item = Result<B, warp::Error>>,
@@ -167,7 +192,12 @@ where
167192

168193
let mut bytes = BytesMut::new();
169194
while let Some(chunk) = body.next().await {
170-
let chunk = chunk.map_err(body_read_error)?;
195+
let chunk = chunk.map_err(|error| {
196+
ErrorMessage::new(
197+
StatusCode::BAD_REQUEST,
198+
format!("Failed reading request body: {error}"),
199+
)
200+
})?;
171201
if chunk.remaining() > max_body_size.saturating_sub(bytes.len()) {
172202
return Err(request_body_too_large_error(max_body_size));
173203
}
@@ -201,6 +231,11 @@ fn zstd_window_log_max(max_decompressed_size: usize) -> Option<u32> {
201231
})
202232
}
203233

234+
#[cfg(any(
235+
feature = "sources-utils-http-prelude",
236+
feature = "sources-opentelemetry",
237+
test
238+
))]
204239
fn request_body_too_large_error(max: usize) -> ErrorMessage {
205240
ErrorMessage::new(
206241
StatusCode::PAYLOAD_TOO_LARGE,
@@ -215,13 +250,6 @@ fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage {
215250
)
216251
}
217252

218-
fn body_read_error(error: warp::Error) -> ErrorMessage {
219-
ErrorMessage::new(
220-
StatusCode::BAD_REQUEST,
221-
format!("Failed reading request body: {error}"),
222-
)
223-
}
224-
225253
pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
226254
emit!(HttpDecompressError {
227255
encoding,

tests/integration/gcp/config/compose.yaml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,6 @@ services:
66
environment:
77
- PUBSUB_PROJECT1=testproject,topic1:subscription1
88
- PUBSUB_PROJECT2=sourceproject,topic2:subscription2
9-
chronicle-emulator:
10-
image: docker.io/timberio/chronicle-emulator:${CONFIG_VERSION}
11-
ports:
12-
- 3000:3000
13-
volumes:
14-
- ./public.pem:/public.pem:ro
15-
command:
16-
- -p
17-
- /public.pem
189

1910
networks:
2011
default:

tests/integration/gcp/config/test.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
features:
22
- gcp-integration-tests
3-
- chronicle-integration-tests
43

54
test_filter: "::gcp"
65

76
env:
87
EMULATOR_ADDRESS: http://gcloud-pubsub:8681
9-
CHRONICLE_ADDRESS: http://chronicle-emulator:3000
108

119
matrix:
1210
version: [latest]

0 commit comments

Comments
 (0)