Open
Conversation
- flowlogs/ec2: Fetch both CloudWatch and S3 flow log destinations.
Introduce FetchedFlowLogs{by_log_group, by_bucket} as the return type
of fetch_all_flow_logs(). Add extract_bucket_from_arn() to parse the
S3 destination ARN into a plain bucket name.
- flowlogs/cache: Replace the single inner map with two independent maps
(by_log_group for CloudWatch, by_bucket for S3). Add symmetric
get/get_mut/insert methods for each map. Update CacheSnapshot with
the same split (#[serde(default)] on both fields for backward
compatibility with persisted cache files). len() sums both maps.
- flowlogs/mod: Add get_config_by_bucket() as a first-class lookup by
S3 bucket name, mirroring the existing get_config() for CloudWatch
log groups. Extract shared helpers ensure_cache_fresh() and
ensure_parsed_fields() to deduplicate the two lookup paths.
fetch_and_update_all() populates both cache maps from FetchedFlowLogs.
- s3logs/mod: Parser gains a lifetime and stores &'a mut FlowLogManager
(passed via new(), used via self in parse()). Per-record flow log
config is resolved before spawning each sub-task by calling
self.flow_log_manager.get_config_by_bucket(). Add ParserType::VpcLog.
- s3logs/s3record: S3Record carries an Option<FlowLogConfig>. When
present, parse_log_lines() selects LogPlatform::VpcFlowLog /
ParserType::VpcLog, skips header lines (version … / # …), emits
ec2.flow-logs.tags.* resource attributes, and calls
parse_vpclog_to_map() for structured field attributes — matching the
existing CloudWatch VPC flow log behaviour exactly.
- forward/forwarder: Pass &mut self.flow_log_manager to Parser::new().
Drive parser.parse() on the current task (not a spawned future) using
a select! loop that concurrently drains result_rx, preserving the
original streaming/pipelining behaviour while keeping the non-Send
FlowLogManager on the current task.
The two identical match-on-join_next blocks in parse() — one inside the
concurrency-limit while loop and one in the drain-remaining loop — are
replaced by a single private async helper:
async fn drain_one_task(
tasks: &mut JoinSet<...>,
result_tx: &mpsc::Sender<...>,
total: &mut usize,
request_id: &str,
) -> Result<bool, ParserError>
Returns Ok(true) when a task was processed, Ok(false) when the set was
empty (causes the caller to break), and Err to abort. The two call sites
become a one-liner each.
Instead of skipping the first line of an S3 VPC flow log file and relying on the pre-parsed fields from FlowLogConfig, parse the header line itself to build the ParsedFields for that file. The header line is the first non-comment line and contains space-separated field names, e.g.: version account-id interface-id srcaddr dstaddr srcport dstport ... parse_log_lines() now: 1. Skips any leading '#' comment lines. 2. Treats the next line as the column header and builds a Vec<ParsedField> from it using get_field_type() for type assignment. 3. Passes the remaining lines as data to the existing parse_line() path. This is more robust than relying on EC2 API metadata because the header in the file is authoritative for the exact columns present. FlowLogConfig is still required (to gate VpcLog detection and supply flow-log tags), but its parsed_fields field is no longer consulted for S3 files. Also re-exports get_field_type from the flowlogs crate so s3record.rs can use it without reaching into the private cache module.
Move header-to-ParsedField conversion into parse/vpclog.rs as a new
pub(crate) function:
pub(crate) fn parse_vpclog_header(header: &str) -> Vec<ParsedField>
It splits the header on whitespace and calls get_field_type() for each
token, identical to what parse_log_format() does for the ${field}
format string used by CloudWatch-delivered logs.
s3record.rs now calls parse_vpclog_header() on lines.split_first() and
treats the returned slice as data lines — no comment-line handling,
no get_field_type import, no inline field-building logic.
Remove the comment-line test (VPC flow log files have no comment lines)
and add four unit tests in vpclog.rs covering types, parity with
parse_log_format, unknown-field default, and empty input.
The call site was doing line.to_string() to convert the &str slice from
data_lines into an owned String before passing it to parse_line, which
then cloned or moved that String into sub-functions. This caused an
allocation per line that was never needed at that point.
parse_line now takes line: &str. Each arm calls .to_string() exactly
once, at the point where ownership is actually required:
VpcLog: set_body_text(line.to_string()) and
parse_vpclog_to_map(line.to_string(), ...)
Json: parse_json_to_map(line.to_string()), and
set_body_text(line.to_string()) on the error path only
Unknown: parse_json_to_map(line.to_string()) for the JSON branch, or
set_body_text(line.to_string()) for the plain-text branch
The Json arm also loses a redundant intermediate clone: it previously
did parse_json_to_map(line.clone()).map_err(|e| (e, line.clone())) to
carry the raw string through the error, then pulled it back out of the
tuple. It now calls parse_json_to_map(line.to_string()) directly and
converts line on the error path instead.
The function only needs an owned String on the field-count mismatch error path, where the raw input is stored in RecordParserError. The integer-parse error path already holds an owned String (a clone of the field value from field_values). parse_vpclog_fields already took &str internally. Changing the signature to &str: - s3logs/s3record.rs: line.to_string() on the parse_vpclog_to_map call is removed; the &str from parse_line is passed directly, so the VpcLog arm now allocates exactly one String (for set_body_text) on the happy path instead of two. - cwlogs/record_parser.rs: message.clone() becomes &message, dropping the clone entirely; message is still moved into ParsedMessage::PlainText on the success path as before.
Both functions only needed an owned String to store in RecordParserError
on their error paths; the actual parsing (serde_json::from_str,
parse_keyvalue_pairs) already worked on &str internally.
parse_json_to_map(&str):
- serde_json::from_str(msg) directly, no &-deref needed
- .to_string() only on the two error paths (non-object JSON, parse
failure) that move the raw input into RecordParserError
parse_keyvalue_to_map(&str):
- parse_keyvalue_pairs(input) directly, the & was already there
- .to_string() only on the empty-pairs error path
Call site changes:
cwlogs/record_parser.rs: parse_message() passes &message to all three
parse_json_to_map / parse_keyvalue_to_map call sites; message is
still moved into ParsedMessage::PlainText / Error as before on the
paths that need ownership
s3logs/s3record.rs: line.to_string() on the two parse_json_to_map
calls in parse_line() is removed; line (&str) is passed directly,
with .to_string() remaining only on set_body_text error paths where
ownership is genuinely needed
mheffner
commented
Mar 11, 2026
| now_nanos, | ||
| log_entry.timestamp, | ||
| vec![string_kv("cloudwatch.id", log_entry.id)], | ||
| ); |
Contributor
Author
There was a problem hiding this comment.
Somewhat unrelated, but cleanup I noticed since cloudwatch.id is never empty in this path.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Support VPC logs that are exported to S3, in addition to the existing Cloudwatch support. VPC logs in S3 contain a header that defines the fields contained in the file, therefore we only need to read the tag names from the VPC flow logs entry. In theory multiple VPC flow logs could be exported to the same bucket and differentiated by folder prefix. Therefore, for S3 VPC logs we track the folder prefix and match the config based on the prefix that matches the object key.
We pass the FlowLogManager across a tokio task spawn boundary so we must wrap the internal contents in
Arc<Mutex<>>. The state is loaded and refreshed as a single object, so this mostly uses coarse locking and contention shouldn't be a large concern. Lookups should be fast and the contents cloned on return.Includes some reductions and push down of str->String conversions that should reduce allocations.