From bcd266fd588668bac37fb5cd9d6e1e4697d19252 Mon Sep 17 00:00:00 2001 From: Aleksander <170264518+t-aleksander@users.noreply.github.com> Date: Mon, 19 Jan 2026 12:07:49 +0100 Subject: [PATCH] send logs from proxy --- Cargo.lock | 7 +++-- Cargo.toml | 1 + src/grpc.rs | 10 +------ src/http.rs | 28 +++++++------------- src/lib.rs | 4 +++ src/logging.rs | 71 +++++++++++++++++++++++++++++++++++++++++++++++--- src/main.rs | 11 ++++++-- src/setup.rs | 34 +++++++++++++++++++++--- 8 files changed, 127 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96128fbf..22718c5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -390,12 +390,14 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.42" +version = "0.4.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" +checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", "windows-link", ] @@ -667,6 +669,7 @@ dependencies = [ "axum-client-ip", "axum-extra", "base64", + "chrono", "clap", "defguard_certs", "defguard_version", diff --git a/Cargo.toml b/Cargo.toml index 76154ffe..36f5765d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ base64 = "0.22" tower = "0.5" futures-util = "0.3" ammonia = "4.1.1" +chrono = "0.4" [build-dependencies] tonic-prost-build = "0.14" diff --git a/src/grpc.rs b/src/grpc.rs index 3dd2c21d..fcb4feea 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -24,7 +24,6 @@ use tracing::Instrument; use crate::{ error::ApiError, - http::GRPC_SERVER_RESTART_CHANNEL, proto::{core_request, core_response, proxy_server, CoreRequest, CoreResponse, DeviceInfo}, MIN_CORE_VERSION, VERSION, }; @@ -45,7 +44,6 @@ pub(crate) struct ProxyServer { pub(crate) connected: Arc, pub(crate) core_version: Arc>>, config: Arc>>, - setup_in_progress: Arc, } impl ProxyServer { @@ -59,7 +57,6 @@ impl ProxyServer { connected: Arc::new(AtomicBool::new(false)), core_version: Arc::new(Mutex::new(None)), config: Arc::new(Mutex::new(None)), - setup_in_progress: Arc::new(AtomicBool::new(false)), } } @@ -118,11 +115,7 @@ impl ProxyServer { builder .add_service(versioned_service) - .serve_with_shutdown(addr, async move { - let mut rx_lock = GRPC_SERVER_RESTART_CHANNEL.1.lock().await; - rx_lock.recv().await; - info!("Shutting down gRPC server for restart..."); - }) + .serve(addr) .await .map_err(|err| { error!("gRPC server error: {err}"); @@ -190,7 +183,6 @@ impl Clone for ProxyServer { connected: Arc::clone(&self.connected), core_version: Arc::clone(&self.core_version), config: Arc::clone(&self.config), - setup_in_progress: Arc::clone(&self.setup_in_progress), } } } diff --git a/src/http.rs b/src/http.rs index f18d52d3..c1eedced 100644 --- a/src/http.rs +++ b/src/http.rs @@ -3,7 +3,7 @@ use std::{ fs::read_to_string, net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, - sync::{atomic::Ordering, Arc, LazyLock}, + sync::{atomic::Ordering, Arc}, time::Duration, }; @@ -20,11 +20,7 @@ use axum_extra::extract::cookie::Key; use clap::crate_version; use defguard_version::{server::DefguardVersionLayer, Version}; use serde::Serialize; -use tokio::{ - net::TcpListener, - sync::{oneshot, Mutex}, - task::JoinSet, -}; +use tokio::{net::TcpListener, sync::oneshot, task::JoinSet}; use tower_governor::{ governor::GovernorConfigBuilder, key_extractor::SmartIpKeyExtractor, GovernorLayer, }; @@ -40,7 +36,7 @@ use crate::{ grpc::{Configuration, ProxyServer}, handlers::{desktop_client_mfa, enrollment, password_reset, polling}, setup::ProxySetupServer, - CommsChannel, VERSION, + LogsReceiver, VERSION, }; pub(crate) static ENROLLMENT_COOKIE_NAME: &str = "defguard_proxy"; @@ -50,13 +46,8 @@ const DEFGUARD_CORE_VERSION_HEADER: &str = "defguard-core-version"; const RATE_LIMITER_CLEANUP_PERIOD: Duration = Duration::from_secs(60); const X_FORWARDED_FOR: &str = "x-forwarded-for"; const X_POWERED_BY: &str = "x-powered-by"; -const GRPC_CERT_NAME: &str = "proxy_grpc_cert.pem"; -const GRPC_KEY_NAME: &str = "proxy_grpc_key.pem"; - -pub static GRPC_SERVER_RESTART_CHANNEL: LazyLock> = LazyLock::new(|| { - let (tx, rx) = tokio::sync::mpsc::channel(100); - (Arc::new(Mutex::new(tx)), Arc::new(Mutex::new(rx))) -}); +pub(crate) const GRPC_CERT_NAME: &str = "proxy_grpc_cert.pem"; +pub(crate) const GRPC_KEY_NAME: &str = "proxy_grpc_key.pem"; #[derive(Clone)] pub(crate) struct AppState { @@ -173,7 +164,7 @@ async fn powered_by_header(mut response: Response) -> Response { response } -pub async fn run_server(config: Config) -> anyhow::Result<()> { +pub async fn run_server(config: Config, logs_rx: LogsReceiver) -> anyhow::Result<()> { info!("Starting Defguard Proxy server"); debug!("Using config: {config:?}"); @@ -193,7 +184,7 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> { let server_clone = grpc_server.clone(); - let setup_server = ProxySetupServer::new(); + let setup_server = ProxySetupServer::new(logs_rx.clone()); // Start gRPC server. // TODO: Wait with spawning the HTTP server until gRPC server is ready. @@ -205,6 +196,7 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> { } loop { + info!("Starting gRPC server..."); let server_to_run = server_clone.clone(); if let (Some(cert), Some(key)) = ( @@ -219,11 +211,11 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> { } else if !server_clone.setup_completed() { // Only attempt setup if not already configured info!( - "No gRPC TLS certificates found at {}, new certificates will be generated", + "No gRPC TLS certificates found at {}, new certificates will be obtained during setup", cert_dir.display() ); let configuration = setup_server - .await_setup(SocketAddr::new( + .await_initial_setup(SocketAddr::new( config .grpc_bind_address .unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), diff --git a/src/lib.rs b/src/lib.rs index a9e22f5c..c87e0274 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use defguard_version::Version; use tokio::sync::mpsc; +use crate::proto::LogEntry; + pub mod assets; pub mod config; mod enterprise; @@ -27,3 +29,5 @@ type CommsChannel = ( Arc>>, Arc>>, ); + +type LogsReceiver = Arc>>; diff --git a/src/logging.rs b/src/logging.rs index 13929a0b..44c0c88c 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -8,6 +8,7 @@ use defguard_version::{ ComponentInfo, DefguardVersionError, Version, }; use log::LevelFilter; +use tokio::sync::mpsc::Sender; use tracing::{Event, Level, Subscriber}; use tracing_subscriber::{ fmt::{ @@ -19,18 +20,25 @@ use tracing_subscriber::{ layer::SubscriberExt, registry::LookupSpan, util::SubscriberInitExt, - EnvFilter, + EnvFilter, Layer, }; +use crate::proto::LogEntry; + // Initializes tracing with the specified log level and version information. // Allows fine-grained filtering with `EnvFilter` directives. // The directives are read from `DEFGUARD_PROXY_LOG_FILTER` env variable. // For more info read: -pub fn init_tracing(own_version: Version, level: &LevelFilter) -> Result<(), DefguardVersionError> { +pub fn init_tracing( + own_version: Version, + level: &LevelFilter, + logs_tx: Sender, +) -> Result<(), DefguardVersionError> { tracing_subscriber::registry() .with( - EnvFilter::try_from_env("DEFGUARD_PROXY_LOG_FILTER") - .unwrap_or_else(|_| level.to_string().into()), + EnvFilter::try_from_env("DEFGUARD_PROXY_LOG_FILTER").unwrap_or_else(|_| { + format!("{level},h2=warn,h2::codec=off,tower=warn,hyper=warn").into() + }), ) .with(VersionFieldLayer) .with( @@ -38,6 +46,7 @@ pub fn init_tracing(own_version: Version, level: &LevelFilter) -> Result<(), Def .event_format(HttpVersionFormatter::new(own_version)) .fmt_fields(VersionFilteredFields), ) + .with(GrpcLogLayer::new(logs_tx)) .init(); info!("Tracing initialized"); @@ -157,3 +166,57 @@ where writeln!(versioned_writer) } } + +/// A tracing layer that sends log entries to a gRPC logs channel. +pub struct GrpcLogLayer { + logs_tx: Sender, +} + +impl GrpcLogLayer { + #[must_use] + pub const fn new(logs_tx: Sender) -> Self { + Self { logs_tx } + } +} + +impl Layer for GrpcLogLayer +where + S: Subscriber, +{ + fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { + if self.logs_tx.is_closed() { + return; + } + + let mut visitor = LogVisitor::default(); + event.record(&mut visitor); + + let entry = LogEntry { + level: format!("{:?}", event.metadata().level()), + target: event.metadata().target().to_string(), + message: visitor.message, + timestamp: chrono::Utc::now().to_rfc3339(), + fields: visitor.fields, + }; + + // Drop the buffer overflow error for now + let _ = self.logs_tx.try_send(entry); + } +} + +#[derive(Default)] +struct LogVisitor { + message: String, + fields: std::collections::HashMap, +} + +impl tracing::field::Visit for LogVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.message = format!("{value:?}"); + } else { + self.fields + .insert(field.name().to_string(), format!("{value:?}")); + } + } +} diff --git a/src/main.rs b/src/main.rs index 0c3ef877..a7e06d84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,8 @@ +use std::sync::Arc; + use defguard_proxy::{config::get_config, http::run_server, logging::init_tracing, VERSION}; use defguard_version::Version; +use tokio::sync::mpsc; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -8,13 +11,17 @@ async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); } + // TODO: The channel size may need to be adjusted or some other approach should be used + // to avoid dropping log messages. + let (logs_tx, logs_rx) = mpsc::channel(200); + let config = get_config()?; - init_tracing(Version::parse(VERSION)?, &config.log_level)?; + init_tracing(Version::parse(VERSION)?, &config.log_level, logs_tx)?; // read config from env tracing::info!("Starting ... version v{}", VERSION); // run API web server - run_server(config).await?; + run_server(config, Arc::new(tokio::sync::Mutex::new(logs_rx))).await?; Ok(()) } diff --git a/src/setup.rs b/src/setup.rs index 437ac81f..51189d3b 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -11,13 +11,14 @@ use defguard_version::{ DefguardComponent, Version, }; use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{transport::Server, Request, Response, Status}; use crate::{ error::ApiError, grpc::Configuration, - proto::{proxy_setup_server, DerPayload, InitialSetupInfo}, - CommsChannel, MIN_CORE_VERSION, VERSION, + proto::{proxy_setup_server, DerPayload, InitialSetupInfo, LogEntry}, + CommsChannel, LogsReceiver, MIN_CORE_VERSION, VERSION, }; static SETUP_CHANNEL: LazyLock>> = LazyLock::new(|| { @@ -31,6 +32,7 @@ static SETUP_CHANNEL: LazyLock>> = LazyLock:: pub(crate) struct ProxySetupServer { setup_in_progress: Arc, key_pair: Arc>>, + logs_rx: LogsReceiver, } impl Clone for ProxySetupServer { @@ -38,15 +40,17 @@ impl Clone for ProxySetupServer { Self { setup_in_progress: Arc::clone(&self.setup_in_progress), key_pair: Arc::clone(&self.key_pair), + logs_rx: Arc::clone(&self.logs_rx), } } } impl ProxySetupServer { - pub fn new() -> Self { + pub fn new(logs_rx: LogsReceiver) -> Self { Self { setup_in_progress: Arc::new(AtomicBool::new(false)), key_pair: Arc::new(Mutex::new(None)), + logs_rx, } } @@ -54,7 +58,7 @@ impl ProxySetupServer { /// Spins up a dedicated temporary gRPC server over HTTP to handle the setup process. /// The setup process involves generating a CSR, sending it to Core and receiving signed TLS certificates. /// Returns the received gRPC configuration (locally generated key pair and remotely signed certificate) upon successful setup. - pub(crate) async fn await_setup( + pub(crate) async fn await_initial_setup( &self, addr: SocketAddr, ) -> Result { @@ -109,6 +113,8 @@ impl ProxySetupServer { #[tonic::async_trait] impl proxy_setup_server::ProxySetup for ProxySetupServer { + type ReadLogsStream = UnboundedReceiverStream>; + async fn start( &self, request: Request, @@ -213,4 +219,24 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer { Ok(Response::new(())) } + + async fn read_logs( + &self, + _request: Request<()>, + ) -> Result, Status> { + let logs_rx = self.logs_rx.clone(); + + let (tx, rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + while let Some(log_entry) = logs_rx.lock().await.recv().await { + if let Err(e) = tx.send(Ok(log_entry)) { + debug!("Failed to send log entry to gRPC stream: receiver disconnected ({e})",); + break; + } + } + }); + + Ok(Response::new(UnboundedReceiverStream::new(rx))) + } }