Skip to content

Commit c9f487f

Browse files
lmamminojlizen
andauthored
feat(‎lambda-events): Improve ergonomics of SqsBatchResponse and KinesisEventResponse (#1063)
* feat: Improve ergonomics of SqsBatchResponse and KinesisEventResponse * chore: fmt + clippy * chore: better docs * feat: add set_failures helper and improve add_failure ergonomics * feat(kinesis): add add_failure and set_failures helpers to KinesisEventResponse * chore: fixes after review * fix: all tests pass * Update documentation for set_failures method Use `..Default::default()` and add note about default field values for KinesisBatchItemFailure. * Update documentation for add_failure method Use ..Default::default() in add_failure * Clarify BatchItemFailure default values in documentation Added clarification about default field values for BatchItemFailure. * Fix formatting in KinesisBatchItemFailure initialization --------- Co-authored-by: Jess Izen <[email protected]>
1 parent 5c018d5 commit c9f487f

File tree

3 files changed

+242
-0
lines changed

3 files changed

+242
-0
lines changed

lambda-events/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,6 @@ catch-all-fields = []
129129

130130
[package.metadata.docs.rs]
131131
all-features = true
132+
133+
[dev-dependencies]
134+
lambda_runtime = { path = "../lambda-runtime" }

lambda-events/src/event/sqs/mod.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,118 @@ pub struct SqsBatchResponse {
154154
pub other: serde_json::Map<String, Value>,
155155
}
156156

157+
impl SqsBatchResponse {
158+
/// Add a failed message ID to the batch response.
159+
///
160+
/// When processing SQS messages in batches, you can use this helper method to
161+
/// register individual message failures. Lambda will automatically return failed
162+
/// messages to the queue for reprocessing while successfully processed messages
163+
/// will be deleted.
164+
///
165+
/// Besides `item_identifiers`, the generated struct will use default field values for [`BatchItemFailure`].
166+
///
167+
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
168+
/// to be enabled in your Lambda function's SQS event source mapping configuration.
169+
/// Without this setting, Lambda will retry the entire batch on any failure.
170+
///
171+
/// # Example
172+
///
173+
/// ```rust
174+
/// use aws_lambda_events::event::sqs::{SqsEvent, SqsBatchResponse};
175+
/// use lambda_runtime::{service_fn, Error, LambdaEvent};
176+
///
177+
/// async fn function_handler(
178+
/// event: LambdaEvent<SqsEvent>,
179+
/// ) -> Result<SqsBatchResponse, Error> {
180+
/// // Start from a default response
181+
/// let mut response = SqsBatchResponse::default();
182+
///
183+
/// for record in event.payload.records {
184+
/// let message_id = record.message_id.clone().unwrap_or_default();
185+
///
186+
/// // Try to process the message
187+
/// if let Err(e) = process_record(&record).await {
188+
/// println!("Failed to process message {}: {}", message_id, e);
189+
///
190+
/// // Use the helper to register the failure
191+
/// response.add_failure(message_id);
192+
/// }
193+
/// }
194+
///
195+
/// Ok(response)
196+
/// }
197+
///
198+
/// async fn process_record(record: &aws_lambda_events::event::sqs::SqsMessage) -> Result<(), Error> {
199+
/// // Your message processing logic here
200+
/// Ok(())
201+
/// }
202+
/// ```
203+
pub fn add_failure(&mut self, message_id: impl Into<String>) {
204+
self.batch_item_failures.push(BatchItemFailure {
205+
item_identifier: message_id.into(),
206+
..Default::default()
207+
});
208+
}
209+
210+
/// Set multiple failed message IDs at once.
211+
///
212+
/// This is a convenience method for setting all batch item failures in one call.
213+
/// It replaces any previously registered failures.
214+
///
215+
/// Besides `item_identifiers`, the generated struct will use default field values for [`BatchItemFailure`].
216+
///
217+
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
218+
/// to be enabled in your Lambda function's SQS event source mapping configuration.
219+
/// Without this setting, Lambda will retry the entire batch on any failure.
220+
///
221+
/// # Example
222+
///
223+
/// ```rust
224+
/// use aws_lambda_events::event::sqs::{SqsEvent, SqsBatchResponse};
225+
/// use lambda_runtime::{service_fn, Error, LambdaEvent};
226+
///
227+
/// async fn function_handler(
228+
/// event: LambdaEvent<SqsEvent>,
229+
/// ) -> Result<SqsBatchResponse, Error> {
230+
/// let mut failed_ids = Vec::new();
231+
///
232+
/// for record in event.payload.records {
233+
/// let message_id = record.message_id.clone().unwrap_or_default();
234+
///
235+
/// // Try to process the message
236+
/// if let Err(e) = process_record(&record).await {
237+
/// println!("Failed to process message {}: {}", message_id, e);
238+
/// failed_ids.push(message_id);
239+
/// }
240+
/// }
241+
///
242+
/// // Set all failures at once
243+
/// let mut response = SqsBatchResponse::default();
244+
/// response.set_failures(failed_ids);
245+
///
246+
/// Ok(response)
247+
/// }
248+
///
249+
/// async fn process_record(record: &aws_lambda_events::event::sqs::SqsMessage) -> Result<(), Error> {
250+
/// // Your message processing logic here
251+
/// Ok(())
252+
/// }
253+
/// ```
254+
pub fn set_failures<I, S>(&mut self, message_ids: I)
255+
where
256+
I: IntoIterator<Item = S>,
257+
S: Into<String>,
258+
{
259+
self.batch_item_failures = message_ids
260+
.into_iter()
261+
.map(|id| BatchItemFailure {
262+
item_identifier: id.into(),
263+
..Default::default()
264+
})
265+
.collect();
266+
}
267+
}
268+
157269
#[non_exhaustive]
158270
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
159271
#[serde(rename_all = "camelCase")]
@@ -335,4 +447,33 @@ mod test {
335447
let reparsed: SqsApiEventObj<CustStruct> = serde_json::from_slice(output.as_bytes()).unwrap();
336448
assert_eq!(parsed, reparsed);
337449
}
450+
451+
#[test]
452+
#[cfg(feature = "sqs")]
453+
fn example_sqs_batch_response_add_failure() {
454+
let mut response = SqsBatchResponse::default();
455+
response.add_failure("msg-1".to_string());
456+
response.add_failure("msg-2".to_string());
457+
458+
assert_eq!(response.batch_item_failures.len(), 2);
459+
assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1");
460+
assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2");
461+
}
462+
463+
#[test]
464+
#[cfg(feature = "sqs")]
465+
fn example_sqs_batch_response_set_failures() {
466+
let mut response = SqsBatchResponse::default();
467+
response.set_failures(vec!["msg-1", "msg-2", "msg-3"]);
468+
469+
assert_eq!(response.batch_item_failures.len(), 3);
470+
assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1");
471+
assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2");
472+
assert_eq!(response.batch_item_failures[2].item_identifier, "msg-3");
473+
474+
// Test that set_failures replaces existing failures
475+
response.set_failures(vec!["msg-4".to_string()]);
476+
assert_eq!(response.batch_item_failures.len(), 1);
477+
assert_eq!(response.batch_item_failures[0].item_identifier, "msg-4");
478+
}
338479
}

lambda-events/src/event/streams/mod.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,54 @@ pub struct KinesisEventResponse {
1717
pub other: serde_json::Map<String, Value>,
1818
}
1919

20+
impl KinesisEventResponse {
21+
/// Add a failed item identifier to the batch response.
22+
///
23+
/// When processing Kinesis records in batches, you can use this helper method to
24+
/// register individual record failures. Lambda will automatically retry failed
25+
/// records while successfully processed records will be checkpointed.
26+
///
27+
/// Besides `item_identifiers`, the generated struct will use default field values for [`KinesisBatchItemFailure`].
28+
///
29+
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
30+
/// to be enabled in your Lambda function's Kinesis event source mapping configuration.
31+
/// Without this setting, Lambda will retry the entire batch on any failure.
32+
pub fn add_failure(&mut self, item_identifier: impl Into<String>) {
33+
self.batch_item_failures.push(KinesisBatchItemFailure {
34+
item_identifier: Some(item_identifier.into()),
35+
#[cfg(feature = "catch-all-fields")]
36+
other: serde_json::Map::new(),
37+
..Default::default()
38+
});
39+
}
40+
41+
/// Set multiple failed item identifiers at once.
42+
///
43+
/// This is a convenience method for setting all batch item failures in one call.
44+
/// It replaces any previously registered failures.
45+
///
46+
/// Besides `item_identifiers`, the generated struct will use default field values for [`KinesisBatchItemFailure`].
47+
///
48+
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
49+
/// to be enabled in your Lambda function's Kinesis event source mapping configuration.
50+
/// Without this setting, Lambda will retry the entire batch on any failure.
51+
pub fn set_failures<I, S>(&mut self, item_identifiers: I)
52+
where
53+
I: IntoIterator<Item = S>,
54+
S: Into<String>,
55+
{
56+
self.batch_item_failures = item_identifiers
57+
.into_iter()
58+
.map(|id| KinesisBatchItemFailure {
59+
item_identifier: Some(id.into()),
60+
#[cfg(feature = "catch-all-fields")]
61+
other: serde_json::Map::new(),
62+
..Default::default()
63+
})
64+
.collect();
65+
}
66+
}
67+
2068
/// `KinesisBatchItemFailure` is the individual record which failed processing.
2169
#[non_exhaustive]
2270
#[derive(Debug, Default, Clone, Eq, PartialEq, Deserialize, Serialize)]
@@ -94,3 +142,53 @@ pub struct SqsBatchItemFailure {
94142
#[serde(flatten)]
95143
pub other: serde_json::Map<String, Value>,
96144
}
145+
146+
#[cfg(test)]
147+
mod test {
148+
use super::*;
149+
150+
#[test]
151+
fn kinesis_event_response_add_failure() {
152+
let mut response = KinesisEventResponse::default();
153+
response.add_failure("seq-1");
154+
response.add_failure("seq-2".to_string());
155+
156+
assert_eq!(response.batch_item_failures.len(), 2);
157+
assert_eq!(
158+
response.batch_item_failures[0].item_identifier,
159+
Some("seq-1".to_string())
160+
);
161+
assert_eq!(
162+
response.batch_item_failures[1].item_identifier,
163+
Some("seq-2".to_string())
164+
);
165+
}
166+
167+
#[test]
168+
fn kinesis_event_response_set_failures() {
169+
let mut response = KinesisEventResponse::default();
170+
response.set_failures(vec!["seq-1", "seq-2", "seq-3"]);
171+
172+
assert_eq!(response.batch_item_failures.len(), 3);
173+
assert_eq!(
174+
response.batch_item_failures[0].item_identifier,
175+
Some("seq-1".to_string())
176+
);
177+
assert_eq!(
178+
response.batch_item_failures[1].item_identifier,
179+
Some("seq-2".to_string())
180+
);
181+
assert_eq!(
182+
response.batch_item_failures[2].item_identifier,
183+
Some("seq-3".to_string())
184+
);
185+
186+
// Test that set_failures replaces existing failures
187+
response.set_failures(vec!["seq-4".to_string()]);
188+
assert_eq!(response.batch_item_failures.len(), 1);
189+
assert_eq!(
190+
response.batch_item_failures[0].item_identifier,
191+
Some("seq-4".to_string())
192+
);
193+
}
194+
}

0 commit comments

Comments
 (0)