Skip to content

Read VPC logs from S3#36

Open
mheffner wants to merge 19 commits intomainfrom
vpc-logs-s3
Open

Read VPC logs from S3#36
mheffner wants to merge 19 commits intomainfrom
vpc-logs-s3

Conversation

@mheffner
Copy link
Contributor

@mheffner mheffner commented Mar 11, 2026

Support VPC logs that are exported to S3, in addition to the existing Cloudwatch support. VPC logs in S3 contain a header that defines the fields contained in the file, therefore we only need to read the tag names from the VPC flow logs entry. In theory multiple VPC flow logs could be exported to the same bucket and differentiated by folder prefix. Therefore, for S3 VPC logs we track the folder prefix and match the config based on the prefix that matches the object key.

We pass the FlowLogManager across a tokio task spawn boundary so we must wrap the internal contents in Arc<Mutex<>>. The state is loaded and refreshed as a single object, so this mostly uses coarse locking and contention shouldn't be a large concern. Lookups should be fast and the contents cloned on return.

Includes some reductions and push down of str->String conversions that should reduce allocations.

mheffner added 13 commits March 9, 2026 18:56
- flowlogs/ec2: Fetch both CloudWatch and S3 flow log destinations.
  Introduce FetchedFlowLogs{by_log_group, by_bucket} as the return type
  of fetch_all_flow_logs(). Add extract_bucket_from_arn() to parse the
  S3 destination ARN into a plain bucket name.

- flowlogs/cache: Replace the single inner map with two independent maps
  (by_log_group for CloudWatch, by_bucket for S3). Add symmetric
  get/get_mut/insert methods for each map. Update CacheSnapshot with
  the same split (#[serde(default)] on both fields for backward
  compatibility with persisted cache files). len() sums both maps.

- flowlogs/mod: Add get_config_by_bucket() as a first-class lookup by
  S3 bucket name, mirroring the existing get_config() for CloudWatch
  log groups. Extract shared helpers ensure_cache_fresh() and
  ensure_parsed_fields() to deduplicate the two lookup paths.
  fetch_and_update_all() populates both cache maps from FetchedFlowLogs.

- s3logs/mod: Parser gains a lifetime and stores &'a mut FlowLogManager
  (passed via new(), used via self in parse()). Per-record flow log
  config is resolved before spawning each sub-task by calling
  self.flow_log_manager.get_config_by_bucket(). Add ParserType::VpcLog.

- s3logs/s3record: S3Record carries an Option<FlowLogConfig>. When
  present, parse_log_lines() selects LogPlatform::VpcFlowLog /
  ParserType::VpcLog, skips header lines (version … / # …), emits
  ec2.flow-logs.tags.* resource attributes, and calls
  parse_vpclog_to_map() for structured field attributes — matching the
  existing CloudWatch VPC flow log behaviour exactly.

- forward/forwarder: Pass &mut self.flow_log_manager to Parser::new().
  Drive parser.parse() on the current task (not a spawned future) using
  a select! loop that concurrently drains result_rx, preserving the
  original streaming/pipelining behaviour while keeping the non-Send
  FlowLogManager on the current task.
The two identical match-on-join_next blocks in parse() — one inside the
concurrency-limit while loop and one in the drain-remaining loop — are
replaced by a single private async helper:

  async fn drain_one_task(
      tasks: &mut JoinSet<...>,
      result_tx: &mpsc::Sender<...>,
      total: &mut usize,
      request_id: &str,
  ) -> Result<bool, ParserError>

Returns Ok(true) when a task was processed, Ok(false) when the set was
empty (causes the caller to break), and Err to abort. The two call sites
become a one-liner each.
Instead of skipping the first line of an S3 VPC flow log file and
relying on the pre-parsed fields from FlowLogConfig, parse the header
line itself to build the ParsedFields for that file.

The header line is the first non-comment line and contains
space-separated field names, e.g.:

  version account-id interface-id srcaddr dstaddr srcport dstport ...

parse_log_lines() now:
1. Skips any leading '#' comment lines.
2. Treats the next line as the column header and builds a
   Vec<ParsedField> from it using get_field_type() for type assignment.
3. Passes the remaining lines as data to the existing parse_line() path.

This is more robust than relying on EC2 API metadata because the header
in the file is authoritative for the exact columns present. FlowLogConfig
is still required (to gate VpcLog detection and supply flow-log tags),
but its parsed_fields field is no longer consulted for S3 files.

Also re-exports get_field_type from the flowlogs crate so s3record.rs
can use it without reaching into the private cache module.
Move header-to-ParsedField conversion into parse/vpclog.rs as a new
pub(crate) function:

  pub(crate) fn parse_vpclog_header(header: &str) -> Vec<ParsedField>

It splits the header on whitespace and calls get_field_type() for each
token, identical to what parse_log_format() does for the ${field}
format string used by CloudWatch-delivered logs.

s3record.rs now calls parse_vpclog_header() on lines.split_first() and
treats the returned slice as data lines — no comment-line handling,
no get_field_type import, no inline field-building logic.

Remove the comment-line test (VPC flow log files have no comment lines)
and add four unit tests in vpclog.rs covering types, parity with
parse_log_format, unknown-field default, and empty input.
The call site was doing line.to_string() to convert the &str slice from
data_lines into an owned String before passing it to parse_line, which
then cloned or moved that String into sub-functions. This caused an
allocation per line that was never needed at that point.

parse_line now takes line: &str. Each arm calls .to_string() exactly
once, at the point where ownership is actually required:

  VpcLog:   set_body_text(line.to_string()) and
            parse_vpclog_to_map(line.to_string(), ...)
  Json:     parse_json_to_map(line.to_string()), and
            set_body_text(line.to_string()) on the error path only
  Unknown:  parse_json_to_map(line.to_string()) for the JSON branch, or
            set_body_text(line.to_string()) for the plain-text branch

The Json arm also loses a redundant intermediate clone: it previously
did parse_json_to_map(line.clone()).map_err(|e| (e, line.clone())) to
carry the raw string through the error, then pulled it back out of the
tuple. It now calls parse_json_to_map(line.to_string()) directly and
converts line on the error path instead.
The function only needs an owned String on the field-count mismatch
error path, where the raw input is stored in RecordParserError.
The integer-parse error path already holds an owned String (a clone of
the field value from field_values). parse_vpclog_fields already took
&str internally.

Changing the signature to &str:
- s3logs/s3record.rs: line.to_string() on the parse_vpclog_to_map call
  is removed; the &str from parse_line is passed directly, so the VpcLog
  arm now allocates exactly one String (for set_body_text) on the happy
  path instead of two.
- cwlogs/record_parser.rs: message.clone() becomes &message, dropping
  the clone entirely; message is still moved into
  ParsedMessage::PlainText
  on the success path as before.
Both functions only needed an owned String to store in RecordParserError
on their error paths; the actual parsing (serde_json::from_str,
parse_keyvalue_pairs) already worked on &str internally.

parse_json_to_map(&str):
  - serde_json::from_str(msg) directly, no &-deref needed
  - .to_string() only on the two error paths (non-object JSON, parse
    failure) that move the raw input into RecordParserError

parse_keyvalue_to_map(&str):
  - parse_keyvalue_pairs(input) directly, the & was already there
  - .to_string() only on the empty-pairs error path

Call site changes:
  cwlogs/record_parser.rs: parse_message() passes &message to all three
    parse_json_to_map / parse_keyvalue_to_map call sites; message is
    still moved into ParsedMessage::PlainText / Error as before on the
    paths that need ownership
  s3logs/s3record.rs: line.to_string() on the two parse_json_to_map
    calls in parse_line() is removed; line (&str) is passed directly,
    with .to_string() remaining only on set_body_text error paths where
    ownership is genuinely needed
now_nanos,
log_entry.timestamp,
vec![string_kv("cloudwatch.id", log_entry.id)],
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat unrelated, but cleanup I noticed since cloudwatch.id is never empty in this path.

@mheffner mheffner marked this pull request as ready for review March 11, 2026 20:09
@mheffner mheffner requested a review from rjenkins March 11, 2026 20:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant