Skip to content

Commit 23b6e68

Browse files
authored
docs(kafka): extract inline code examples into snippet files (#1198)
1 parent f6a90e5 commit 23b6e68

7 files changed

Lines changed: 466 additions & 387 deletions

File tree

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// This file is referenced by docs/utilities/kafka.md
2+
// via pymdownx.snippets (mkdocs).
3+
4+
namespace AWS.Lambda.Powertools.Docs.Snippets.Kafka;
5+
6+
// --8<-- [start:record_metadata]
7+
using AWS.Lambda.Powertools.Kafka;
8+
using AWS.Lambda.Powertools.Kafka.Protobuf;
9+
using AWS.Lambda.Powertools.Logging;
10+
11+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
12+
{
13+
foreach (var record in records)
14+
{
15+
// Log record coordinates for tracing
16+
Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
17+
Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
18+
Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
19+
20+
// Process message headers
21+
foreach (var header in record.Headers.DecodedValues())
22+
{
23+
Logger.LogInformation($"{header.Key}: {header.Value}");
24+
}
25+
26+
// Access the Avro deserialized message content
27+
CustomerProfile customerProfile = record.Value; // CustomerProfile class is auto-generated from Protobuf schema
28+
Logger.LogInformation("Processing order for: {fullName}", customerProfile.FullName);
29+
}
30+
}
31+
32+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
33+
new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
34+
.Build()
35+
.RunAsync();
36+
// --8<-- [end:record_metadata]
37+
38+
// --8<-- [start:error_handling]
39+
using AWS.Lambda.Powertools.Kafka;
40+
using AWS.Lambda.Powertools.Kafka.Protobuf;
41+
using AWS.Lambda.Powertools.Logging;
42+
43+
var successfulRecords = 0;
44+
var failedRecords = 0;
45+
46+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
47+
{
48+
foreach (var record in records)
49+
{
50+
try
51+
{
52+
// Process each record
53+
Logger.LogInformation("Processing record from topic: {topic}", record.Topic);
54+
Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
55+
56+
// Access the deserialized message content
57+
CustomerProfile customerProfile = record.Value; // CustomerProfile class is auto-generated from Protobuf schema
58+
ProcessOrder(customerProfile);
59+
successfulRecords ++;
60+
}
61+
catch (Exception ex)
62+
{
63+
failedRecords ++;
64+
65+
// Log the error and continue processing other records
66+
Logger.LogError(ex, "Error processing record from topic: {topic}, partition: {partition}, offset: {offset}",
67+
record.Topic, record.Partition, record.Offset);
68+
69+
SendToDeadLetterQueue(record, ex); // Optional: Send to a dead-letter queue for further analysis
70+
}
71+
72+
Logger.LogInformation("Record Value: {@record}", record.Value);
73+
}
74+
75+
return $"Processed {successfulRecords} records successfully, {failedRecords} records failed";
76+
}
77+
78+
private void ProcessOrder(CustomerProfile customerProfile)
79+
{
80+
Logger.LogInformation("Processing order for: {fullName}", customerProfile.FullName);
81+
// Your business logic to process the order
82+
// This could throw exceptions for various reasons (e.g., validation errors, database issues)
83+
}
84+
85+
private void SendToDeadLetterQueue(ConsumerRecord<string, CustomerProfile> record, Exception ex)
86+
{
87+
// Implement your dead-letter queue logic here
88+
Logger.LogError("Sending record to dead-letter queue: {record}, error: {error}", record, ex.Message);
89+
}
90+
91+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
92+
new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
93+
.Build()
94+
.RunAsync();
95+
// --8<-- [end:error_handling]
96+
97+
// --8<-- [start:idempotent_processing]
98+
using Amazon.Lambda.Core;
99+
using AWS.Lambda.Powertools.Kafka;
100+
using AWS.Lambda.Powertools.Kafka.Protobuf;
101+
using AWS.Lambda.Powertools.Logging;
102+
using AWS.Lambda.Powertools.Idempotency;
103+
104+
[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]
105+
106+
namespace ProtoBufClassLibrary;
107+
108+
public class Function
109+
{
110+
public Function()
111+
{
112+
Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table"));
113+
}
114+
115+
public string FunctionHandler(ConsumerRecords<string, Payment> records, ILambdaContext context)
116+
{
117+
foreach (var record in records)
118+
{
119+
ProcessPayment(record.Key, record.Value);
120+
}
121+
122+
return "Processed " + records.Count() + " records";
123+
}
124+
125+
[Idempotent]
126+
private void ProcessPayment(Payment payment)
127+
{
128+
Logger.LogInformation("Processing payment {paymentId} for customer {customerName}",
129+
payment.Id, payment.CustomerName);
130+
131+
// Your payment processing logic here
132+
// This could involve calling an external payment service, updating a database, etc.
133+
}
134+
}
135+
// --8<-- [end:idempotent_processing]
136+
137+
// --8<-- [start:cross_language_compatibility]
138+
using AWS.Lambda.Powertools.Kafka;
139+
using AWS.Lambda.Powertools.Kafka.Protobuf;
140+
using AWS.Lambda.Powertools.Logging;
141+
142+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
143+
{
144+
foreach (var record in records)
145+
{
146+
Logger.LogInformation("Record Value: {@record}", record.Value);
147+
}
148+
149+
return "Processed " + records.Count() + " records";
150+
}
151+
152+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
153+
new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
154+
.Build()
155+
.RunAsync();
156+
157+
// Example class that handles Python snake_case field names
158+
public partial class CustomerProfile
159+
{
160+
[JsonPropertyName("user_id")] public string UserId { get; set; }
161+
162+
[JsonPropertyName("full_name")] public string FullName { get; set; }
163+
164+
[JsonPropertyName("age")] public long Age { get; set; }
165+
166+
[JsonPropertyName("account_status")] public string AccountStatus { get; set; }
167+
}
168+
// --8<-- [end:cross_language_compatibility]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// This file is referenced by docs/utilities/kafka.md
2+
// via pymdownx.snippets (mkdocs).
3+
4+
namespace AWS.Lambda.Powertools.Docs.Snippets.Kafka;
5+
6+
// --8<-- [start:key_value_deserialization]
7+
using AWS.Lambda.Powertools.Kafka;
8+
using AWS.Lambda.Powertools.Kafka.Protobuf;
9+
using AWS.Lambda.Powertools.Logging;
10+
11+
string Handler(ConsumerRecords<CustomerKey, CustomerProfile> records, ILambdaContext context)
12+
{
13+
foreach (var record in records)
14+
{
15+
Logger.LogInformation("Record Value: {@record}", record.Value);
16+
}
17+
18+
return "Processed " + records.Count() + " records";
19+
}
20+
21+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<CustomerKey, CustomerProfile>, ILambdaContext, string>?)Handler,
22+
new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
23+
.Build()
24+
.RunAsync();
25+
// --8<-- [end:key_value_deserialization]
26+
27+
// --8<-- [start:value_only_deserialization]
28+
using AWS.Lambda.Powertools.Kafka;
29+
using AWS.Lambda.Powertools.Kafka.Protobuf;
30+
using AWS.Lambda.Powertools.Logging;
31+
32+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
33+
{
34+
foreach (var record in records)
35+
{
36+
Logger.LogInformation("Record Value: {@record}", record.Value);
37+
}
38+
39+
return "Processed " + records.Count() + " records";
40+
}
41+
42+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
43+
new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
44+
.Build()
45+
.RunAsync();
46+
// --8<-- [end:value_only_deserialization]
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// This file is referenced by docs/utilities/kafka.md
2+
// via pymdownx.snippets (mkdocs).
3+
4+
namespace AWS.Lambda.Powertools.Docs.Snippets.Kafka;
5+
6+
// --8<-- [start:class_library_deployment]
7+
using AWS.Lambda.Powertools.Kafka;
8+
using AWS.Lambda.Powertools.Kafka.Avro;
9+
using AWS.Lambda.Powertools.Logging;
10+
11+
[assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))] // Use PowertoolsKafkaAvroSerializer for Avro serialization
12+
13+
namespace MyKafkaConsumer;
14+
15+
public class Function
16+
{
17+
public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
18+
{
19+
foreach (var record in records)
20+
{
21+
Logger.LogInformation("Record Value: {@record}", record.Value);
22+
}
23+
24+
return "Processed " + records.Count() + " records";
25+
}
26+
}
27+
// --8<-- [end:class_library_deployment]
28+
29+
// --8<-- [start:top_level_deployment]
30+
using AWS.Lambda.Powertools.Kafka;
31+
using AWS.Lambda.Powertools.Kafka.Avro;
32+
using AWS.Lambda.Powertools.Logging;
33+
34+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
35+
{
36+
foreach (var record in records)
37+
{
38+
Logger.LogInformation("Record Value: {@record}", record.Value);
39+
}
40+
41+
return "Processed " + records.Count() + " records";
42+
}
43+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
44+
new PowertoolsKafkaAvroSerializer()) // Use PowertoolsKafkaAvroSerializer for Avro serialization
45+
.Build()
46+
.RunAsync();
47+
// --8<-- [end:top_level_deployment]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// This file is referenced by docs/utilities/kafka.md
2+
// via pymdownx.snippets (mkdocs).
3+
4+
namespace AWS.Lambda.Powertools.Docs.Snippets.Kafka;
5+
6+
// --8<-- [start:primitive_key]
7+
using AWS.Lambda.Powertools.Kafka;
8+
using AWS.Lambda.Powertools.Kafka.Protobuf;
9+
using AWS.Lambda.Powertools.Logging;
10+
11+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
12+
{
13+
foreach (var record in records)
14+
{
15+
Logger.LogInformation("Record Value: {@record}", record.Value);
16+
}
17+
18+
return "Processed " + records.Count() + " records";
19+
}
20+
21+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
22+
new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
23+
.Build()
24+
.RunAsync();
25+
// --8<-- [end:primitive_key]
26+
27+
// --8<-- [start:primitive_key_and_value]
28+
using AWS.Lambda.Powertools.Kafka;
29+
using AWS.Lambda.Powertools.Kafka.Protobuf;
30+
using AWS.Lambda.Powertools.Logging;
31+
32+
string Handler(ConsumerRecords<string, string> records, ILambdaContext context)
33+
{
34+
foreach (var record in records)
35+
{
36+
Logger.LogInformation("Record Value: {@record}", record.Value);
37+
}
38+
39+
return "Processed " + records.Count() + " records";
40+
}
41+
42+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, string>, ILambdaContext, string>?)Handler,
43+
new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
44+
.Build()
45+
.RunAsync();
46+
// --8<-- [end:primitive_key_and_value]
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// This file is referenced by docs/utilities/kafka.md
2+
// via pymdownx.snippets (mkdocs).
3+
4+
namespace AWS.Lambda.Powertools.Docs.Snippets.Kafka;
5+
6+
// --8<-- [start:avro_messages]
7+
using AWS.Lambda.Powertools.Kafka;
8+
using AWS.Lambda.Powertools.Kafka.Avro;
9+
using AWS.Lambda.Powertools.Logging;
10+
11+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
12+
{
13+
foreach (var record in records)
14+
{
15+
Logger.LogInformation("Record Value: {@record}", record.Value);
16+
}
17+
18+
return "Processed " + records.Count() + " records";
19+
}
20+
21+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
22+
new PowertoolsKafkaAvroSerializer()) // Use PowertoolsKafkaAvroSerializer for Avro serialization
23+
.Build()
24+
.RunAsync();
25+
// --8<-- [end:avro_messages]
26+
27+
// --8<-- [start:protobuf_messages]
28+
using AWS.Lambda.Powertools.Kafka;
29+
using AWS.Lambda.Powertools.Kafka.Protobuf;
30+
using AWS.Lambda.Powertools.Logging;
31+
32+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
33+
{
34+
foreach (var record in records)
35+
{
36+
Logger.LogInformation("Record Value: {@record}", record.Value);
37+
}
38+
39+
return "Processed " + records.Count() + " records";
40+
}
41+
42+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
43+
new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
44+
.Build()
45+
.RunAsync();
46+
// --8<-- [end:protobuf_messages]
47+
48+
// --8<-- [start:json_messages]
49+
using AWS.Lambda.Powertools.Kafka;
50+
using AWS.Lambda.Powertools.Kafka.Json;
51+
using AWS.Lambda.Powertools.Logging;
52+
53+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
54+
{
55+
foreach (var record in records)
56+
{
57+
Logger.LogInformation("Record Value: {@record}", record.Value);
58+
}
59+
60+
return "Processed " + records.Count() + " records";
61+
}
62+
63+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
64+
new PowertoolsKafkaJsonSerializer()) // Use PowertoolsKafkaJsonSerializer for Json serialization
65+
.Build()
66+
.RunAsync();
67+
// --8<-- [end:json_messages]

0 commit comments

Comments
 (0)