Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 66 additions & 188 deletions src/parse/cwlogs.rs → src/cwlogs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ use regex::Regex;
use std::sync::LazyLock;
use std::{collections::HashMap, sync::Arc};

use aws_lambda_events::cloudwatch_logs::{LogEntry, LogsEvent};
use aws_lambda_events::cloudwatch_logs::LogsEvent;
use opentelemetry_proto::tonic::{
common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value::Value},
common::v1::InstrumentationScope,
logs::v1::{ResourceLogs, ScopeLogs},
resource::v1::Resource,
};
use tracing::debug;

use crate::parse::record_parser::RecordLogEntry;
use crate::parse::platform::{LogPlatform, ParserError};
use crate::parse::utils::string_kv;
use crate::{
aws_attributes::AwsAttributes,
flowlogs::{FlowLogManager, ParsedFields},
parse::record_parser::RecordParser,
tags::TagManager,
};

pub mod record_parser;

use record_parser::RecordParser;

/// Parser handles the conversion of AWS CloudWatch Logs events into OpenTelemetry ResourceLogs
pub struct Parser<'a> {
aws_attributes: &'a AwsAttributes,
Expand All @@ -43,28 +47,6 @@ impl<'a> Parser<'a> {

/// Parse an AWS CloudWatch Logs event and return ResourceLogs
pub async fn parse(&mut self, logs_event: LogsEvent) -> Result<Vec<ResourceLogs>, ParserError> {
debug!(
request_id = %self.request_id,
"Starting to parse CloudWatch Logs event"
);

// Parse the CloudWatch Logs event into ResourceLogs
let resource_logs = self.parse_logs_event(logs_event).await?;

debug!(
request_id = %self.request_id,
count = resource_logs.len(),
"Successfully parsed CloudWatch Logs event into ResourceLogs"
);

Ok(resource_logs)
}

/// Internal method to parse the LogsEvent
async fn parse_logs_event(
&mut self,
logs_event: LogsEvent,
) -> Result<Vec<ResourceLogs>, ParserError> {
let mut resource_logs_list = Vec::new();

debug!(
Expand Down Expand Up @@ -106,73 +88,39 @@ impl<'a> Parser<'a> {

// Build base attributes
let mut attributes = vec![
KeyValue {
key: "cloud.provider".to_string(),
value: Some(AnyValue {
value: Some(Value::StringValue("aws".to_string())),
}),
},
KeyValue {
key: "cloud.region".to_string(),
value: Some(AnyValue {
value: Some(Value::StringValue(self.aws_attributes.region.clone())),
}),
},
KeyValue {
key: "cloud.account.id".to_string(),
value: Some(AnyValue {
value: Some(Value::StringValue(self.aws_attributes.account_id.clone())),
}),
},
KeyValue {
key: "cloudwatch.log.group.name".to_string(),
value: Some(AnyValue {
value: Some(Value::StringValue(log_data.log_group)),
}),
},
KeyValue {
key: "cloudwatch.log.stream.name".to_string(),
value: Some(AnyValue {
value: Some(Value::StringValue(log_data.log_stream)),
}),
},
string_kv("cloud.provider", "aws"),
string_kv("cloud.region", self.aws_attributes.region.clone()),
string_kv("cloud.account.id", self.aws_attributes.account_id.clone()),
string_kv("cloudwatch.log.group.name", log_data.log_group),
string_kv("cloudwatch.log.stream.name", log_data.log_stream),
];

// Add cloud.platform attribute based on detected platform
if log_platform != LogPlatform::Unknown {
attributes.push(KeyValue {
key: "cloud.platform".to_string(),
value: Some(AnyValue {
value: Some(Value::StringValue(log_platform.as_str().to_string())),
}),
});
attributes.push(string_kv("cloud.platform", log_platform.as_str()));
}

// Add CloudWatch log group tags as resource attributes
for (tag_key, tag_value) in log_group_tags {
attributes.push(KeyValue {
key: format!("cloudwatch.log.tags.{}", tag_key),
value: Some(AnyValue {
value: Some(Value::StringValue(tag_value)),
}),
});
attributes.push(string_kv(
&format!("cloudwatch.log.tags.{}", tag_key),
tag_value,
));
}

// Add EC2 Flow Log tags as resource attributes
for (tag_key, tag_value) in flow_log_tags {
attributes.push(KeyValue {
key: format!("ec2.flow-logs.tags.{}", tag_key),
value: Some(AnyValue {
value: Some(Value::StringValue(tag_value)),
}),
});
attributes.push(string_kv(
&format!("ec2.flow-logs.tags.{}", tag_key),
tag_value,
));
}

let rec_parser = RecordParser::new(log_platform, parser_type, flow_log_parsed_fields);
let log_records = log_data
.log_events
.into_iter()
.map(|log| rec_parser.parse(now_nanos, log.into()))
.map(|log| rec_parser.parse(now_nanos, log))
.collect();

let resource_logs = ResourceLogs {
Expand All @@ -183,20 +131,16 @@ impl<'a> Parser<'a> {
}),
scope_logs: vec![ScopeLogs {
scope: Some(InstrumentationScope {
name: "rotel-lambda-forwarder".to_string(),
version: "0.0.1".to_string(), // TODO
attributes: vec![KeyValue {
key: "aws.lambda.invoked_arn".to_string(),
value: Some(AnyValue {
value: Some(Value::StringValue(
self.aws_attributes.invoked_function_arn.clone(),
)),
}),
}],
name: env!("CARGO_PKG_NAME").to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
attributes: vec![string_kv(
"aws.lambda.invoked_arn",
self.aws_attributes.invoked_function_arn.clone(),
)],
dropped_attributes_count: 0,
}),
log_records,
schema_url: "".to_string(),
schema_url: String::new(),
}],
schema_url: String::new(),
};
Expand All @@ -207,54 +151,45 @@ impl<'a> Parser<'a> {
}
}

impl From<LogEntry> for RecordLogEntry {
fn from(value: LogEntry) -> Self {
RecordLogEntry::new(Some(value.id), value.timestamp, value.message)
}
}

/// Detect AWS platform from log group name
/// Represents the AWS platform/service that generated the logs
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LogPlatform {
Eks,
Ecs,
Rds,
Lambda,
Codebuild,
Cloudtrail,
VpcFlowLog,
Unknown,
}

impl LogPlatform {
/// Returns the platform string used in cloud.platform attribute
pub fn as_str(&self) -> &'static str {
match self {
LogPlatform::Eks => "aws_eks",
LogPlatform::Ecs => "aws_ecs",
LogPlatform::Rds => "aws_rds",
LogPlatform::Lambda => "aws_lambda",
LogPlatform::Codebuild => "aws_codebuild",
LogPlatform::Cloudtrail => "aws_cloudtrail",
LogPlatform::VpcFlowLog => "aws_vpc_flow_log",
LogPlatform::Unknown => "aws_unknown",
}
}
}

/// Represents the type of parser to use for log entries
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParserType {
pub(crate) enum ParserType {
Json,
KeyValue,
VpcLog,
#[default]
Unknown,
}

static CLOUDTRAIL_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\d{12}_CloudTrail_").unwrap());

fn detect_log_platform(log_group_name: &str, log_stream_name: &str) -> LogPlatform {
if CLOUDTRAIL_REGEX.is_match(log_stream_name) {
return LogPlatform::Cloudtrail;
}

if let Some(rest) = log_group_name.strip_prefix("/aws/") {
if rest.starts_with("eks/") {
LogPlatform::Eks
} else if rest.starts_with("ecs/") {
LogPlatform::Ecs
} else if rest.starts_with("rds/") {
LogPlatform::Rds
} else if rest.starts_with("lambda/") {
LogPlatform::Lambda
} else if rest.starts_with("codebuild/") {
LogPlatform::Codebuild
} else {
LogPlatform::Unknown
}
} else {
LogPlatform::Unknown
}
}

impl<'a> Parser<'a> {
/// Detects the log platform and parser type based on log group and stream names
/// Detects the log platform and parser type based on log group and stream names.
/// Returns (platform, parser_type, optional_flow_log_parsed_fields, flow_log_tags)
async fn detect_log_type(
&mut self,
Expand All @@ -274,7 +209,6 @@ impl<'a> Parser<'a> {
"Detected EC2 Flow Log"
);

// Extract parsed fields if successful, None if parsing failed or not attempted
let parsed_fields = flow_log_config.parsed_fields.as_ref().cloned();

return (
Expand All @@ -288,7 +222,7 @@ impl<'a> Parser<'a> {
// Otherwise, detect the platform normally
let platform = detect_log_platform(log_group_name, log_stream_name);

// Then, determine the parser type based on platform and log stream name
// Determine the parser type based on platform and log stream name
let parser_type = match platform {
LogPlatform::Eks => {
if log_stream_name.starts_with("authenticator-") {
Expand All @@ -305,58 +239,12 @@ impl<'a> Parser<'a> {
}
}

static CLOUDTRAIL_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\d{12}_CloudTrail_").unwrap());

fn detect_log_platform(log_group_name: &str, log_stream_name: &str) -> LogPlatform {
if CLOUDTRAIL_REGEX.is_match(log_stream_name) {
return LogPlatform::Cloudtrail;
}

if let Some(rest) = log_group_name.strip_prefix("/aws/") {
if rest.starts_with("eks/") {
LogPlatform::Eks
} else if rest.starts_with("ecs/") {
LogPlatform::Ecs
} else if rest.starts_with("rds/") {
LogPlatform::Rds
} else if rest.starts_with("lambda/") {
LogPlatform::Lambda
} else if rest.starts_with("codebuild/") {
LogPlatform::Codebuild
} else {
LogPlatform::Unknown
}
} else {
LogPlatform::Unknown
}
}

/// Errors that can occur during parsing
#[derive(Debug, thiserror::Error)]
pub enum ParserError {
#[error("Failed to decode CloudWatch Logs data: {0}")]
DecodeError(String),

#[error("Failed to decompress CloudWatch Logs data: {0}")]
DecompressionError(String),

#[error("Failed to parse JSON: {0}")]
JsonParseError(String),

#[error("Invalid log format: {0}")]
FormatParseError(String),

#[error("Unable to parse EC2 flow log format")]
FlowLogFormatError,
}

#[cfg(test)]
mod tests {
use aws_lambda_events::cloudwatch_logs::LogEntry;

use super::*;
use aws_config::BehaviorVersion;
use aws_lambda_events::cloudwatch_logs::LogEntry;
use opentelemetry_proto::tonic::common::v1::any_value::Value;

#[tokio::test]
async fn test_parse_empty_event() {
Expand All @@ -381,46 +269,36 @@ mod tests {

assert!(result.is_ok());
let resource_logs = result.unwrap();
// Implementation creates one ResourceLogs structure
assert_eq!(resource_logs.len(), 1);
}

#[tokio::test]
async fn test_parse_eks_authenticator_log() {
let log_msg = r#"time="2025-12-24T19:48:32Z" level=info msg="access granted" arn="arn:aws:iam::927209226484:role/AWSWesleyClusterManagerLambda-Add-AddonManagerRole-1CRTQUJF13T5U" client="127.0.0.1:54812" groups="[]" method=POST path=/authenticate stsendpoint=sts.us-east-1.amazonaws.com uid="aws-iam-authenticator:927209226484:AROA5PYP2AD2FVXU23CA6" username="eks:addon-manager""#;

// Test parsing the log entry
let mut log_entry = LogEntry::default();
log_entry.id = "test-id".to_string();
log_entry.timestamp = 1000;
log_entry.message = log_msg.to_string();

let rec_parser = RecordParser::new(LogPlatform::Eks, ParserType::KeyValue, None);
let log_record = rec_parser.parse(123456789, log_entry.into());
let log_record = rec_parser.parse(123456789, log_entry);

// Verify the log was parsed correctly
assert_eq!(log_record.severity_number, 9); // Info
assert_eq!(log_record.severity_text, "INFO");

// Verify body contains the msg field
assert!(log_record.body.is_some());
if let Some(body) = &log_record.body {
if let Some(Value::StringValue(s)) = &body.value {
assert_eq!(s, "access granted");
}
}

// Verify timestamp was parsed
assert!(log_record.time_unix_nano > 0);

// Verify attributes were extracted
let has_arn = log_record.attributes.iter().any(|kv| kv.key == "arn");
let has_method = log_record.attributes.iter().any(|kv| kv.key == "method");
let has_username = log_record.attributes.iter().any(|kv| kv.key == "username");

assert!(has_arn);
assert!(has_method);
assert!(has_username);
assert!(log_record.attributes.iter().any(|kv| kv.key == "arn"));
assert!(log_record.attributes.iter().any(|kv| kv.key == "method"));
assert!(log_record.attributes.iter().any(|kv| kv.key == "username"));
}

#[test]
Expand Down
Loading