Skip to content

Commit d43c746

Browse files
zxqfd555Manul from Pathway
authored andcommitted
create coroutine backend in elasticsearch writer only once (#9826)
GitOrigin-RevId: 7c20ddb72932066e0e15dc79596716ab30cddb71
1 parent 1534e57 commit d43c746

4 files changed

Lines changed: 85 additions & 65 deletions

File tree

src/connectors/data_storage.rs

Lines changed: 1 addition & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use rumqttc::{
4747
Incoming as MqttIncoming, Outgoing as MqttOutgoing, Packet as MqttPacket,
4848
};
4949

50-
use crate::async_runtime::create_async_tokio_runtime;
5150
use crate::connectors::aws::dynamodb::Error as AwsDynamoDBError;
5251
use crate::connectors::aws::kinesis::Error as AwsKinesisError;
5352
use crate::connectors::aws::kinesis::KinesisReader;
@@ -80,7 +79,6 @@ use async_nats::client::PublishError as NatsPublishError;
8079
use async_nats::error::Error as NatsError;
8180
use async_nats::jetstream::context::PublishErrorKind as JetStreamPublishError;
8281
use bincode::ErrorKind as BincodeError;
83-
use elasticsearch::{BulkParts, Elasticsearch};
8482
use glob::PatternError as GlobPatternError;
8583
use mongodb::bson::Document as BsonDocument;
8684
use mongodb::error::Error as MongoError;
@@ -107,6 +105,7 @@ pub use super::data_lake::delta::{
107105
};
108106
pub use super::data_lake::iceberg::IcebergReader;
109107
pub use super::data_lake::LakeWriter;
108+
pub use super::elasticsearch::ElasticSearchWriter;
110109
pub use super::nats::NatsReader;
111110
pub use super::nats::NatsWriter;
112111
pub use super::postgres::{PsqlWriter, SslError};
@@ -1492,68 +1491,6 @@ impl Writer for KafkaWriter {
14921491
}
14931492
}
14941493

1495-
pub struct ElasticSearchWriter {
1496-
client: Elasticsearch,
1497-
index_name: String,
1498-
max_batch_size: Option<usize>,
1499-
1500-
docs_buffer: Vec<Vec<u8>>,
1501-
}
1502-
1503-
impl ElasticSearchWriter {
1504-
pub fn new(client: Elasticsearch, index_name: String, max_batch_size: Option<usize>) -> Self {
1505-
ElasticSearchWriter {
1506-
client,
1507-
index_name,
1508-
max_batch_size,
1509-
docs_buffer: Vec::new(),
1510-
}
1511-
}
1512-
}
1513-
1514-
impl Writer for ElasticSearchWriter {
1515-
fn write(&mut self, data: FormatterContext) -> Result<(), WriteError> {
1516-
for payload in data.payloads {
1517-
self.docs_buffer.push(b"{\"index\": {}}".to_vec());
1518-
self.docs_buffer.push(payload.into_raw_bytes()?);
1519-
}
1520-
1521-
if let Some(max_batch_size) = self.max_batch_size {
1522-
if self.docs_buffer.len() / 2 >= max_batch_size {
1523-
self.flush(true)?;
1524-
}
1525-
}
1526-
1527-
Ok(())
1528-
}
1529-
1530-
fn flush(&mut self, _forced: bool) -> Result<(), WriteError> {
1531-
if self.docs_buffer.is_empty() {
1532-
return Ok(());
1533-
}
1534-
create_async_tokio_runtime()?.block_on(async {
1535-
self.client
1536-
.bulk(BulkParts::Index(&self.index_name))
1537-
.body(take(&mut self.docs_buffer))
1538-
.send()
1539-
.await
1540-
.map_err(WriteError::Elasticsearch)?
1541-
.error_for_status_code()
1542-
.map_err(WriteError::Elasticsearch)?;
1543-
1544-
Ok(())
1545-
})
1546-
}
1547-
1548-
fn name(&self) -> String {
1549-
format!("ElasticSearch({})", self.index_name)
1550-
}
1551-
1552-
fn single_threaded(&self) -> bool {
1553-
false
1554-
}
1555-
}
1556-
15571494
#[derive(Default, Debug)]
15581495
pub struct NullWriter;
15591496

src/connectors/elasticsearch.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright © 2026 Pathway
2+
3+
use std::mem::take;
4+
5+
use crate::async_runtime::create_async_tokio_runtime;
6+
use crate::connectors::data_format::FormatterContext;
7+
use crate::connectors::{WriteError, Writer};
8+
9+
use elasticsearch::{BulkParts, Elasticsearch};
10+
use tokio::runtime::Runtime as TokioRuntime;
11+
12+
pub struct ElasticSearchWriter {
13+
runtime: TokioRuntime,
14+
client: Elasticsearch,
15+
index_name: String,
16+
max_batch_size: Option<usize>,
17+
18+
docs_buffer: Vec<Vec<u8>>,
19+
}
20+
21+
impl ElasticSearchWriter {
22+
pub fn new(
23+
client: Elasticsearch,
24+
index_name: String,
25+
max_batch_size: Option<usize>,
26+
) -> Result<Self, WriteError> {
27+
Ok(ElasticSearchWriter {
28+
runtime: create_async_tokio_runtime()?,
29+
client,
30+
index_name,
31+
max_batch_size,
32+
docs_buffer: Vec::new(),
33+
})
34+
}
35+
}
36+
37+
impl Writer for ElasticSearchWriter {
38+
fn write(&mut self, data: FormatterContext) -> Result<(), WriteError> {
39+
for payload in data.payloads {
40+
self.docs_buffer.push(b"{\"index\": {}}".to_vec());
41+
self.docs_buffer.push(payload.into_raw_bytes()?);
42+
}
43+
44+
if let Some(max_batch_size) = self.max_batch_size {
45+
if self.docs_buffer.len() / 2 >= max_batch_size {
46+
self.flush(true)?;
47+
}
48+
}
49+
50+
Ok(())
51+
}
52+
53+
fn flush(&mut self, _forced: bool) -> Result<(), WriteError> {
54+
if self.docs_buffer.is_empty() {
55+
return Ok(());
56+
}
57+
self.runtime.block_on(async {
58+
self.client
59+
.bulk(BulkParts::Index(&self.index_name))
60+
.body(take(&mut self.docs_buffer))
61+
.send()
62+
.await
63+
.map_err(WriteError::Elasticsearch)?
64+
.error_for_status_code()
65+
.map_err(WriteError::Elasticsearch)?;
66+
67+
Ok(())
68+
})
69+
}
70+
71+
fn name(&self) -> String {
72+
format!("ElasticSearch({})", self.index_name)
73+
}
74+
75+
fn single_threaded(&self) -> bool {
76+
false
77+
}
78+
}

src/connectors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod data_format;
2323
pub mod data_lake;
2424
pub mod data_storage;
2525
pub mod data_tokenize;
26+
pub mod elasticsearch;
2627
pub mod metadata;
2728
pub mod monitoring;
2829
pub mod nats;

src/python_api.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6354,7 +6354,11 @@ impl DataStorage {
63546354
let index_name = elasticsearch_client_params.index_name.clone();
63556355
let max_batch_size = self.max_batch_size;
63566356

6357-
let writer = ElasticSearchWriter::new(client, index_name, max_batch_size);
6357+
let writer = ElasticSearchWriter::new(client, index_name, max_batch_size).map_err(|e| {
6358+
PyRuntimeError::new_err(format!(
6359+
"Failed to create async runtime for ElasticSearch writer: {e}"
6360+
))
6361+
})?;
63586362
Ok(Box::new(writer))
63596363
}
63606364

0 commit comments

Comments
 (0)