Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ base64 = "0.22"
tower = "0.5"
futures-util = "0.3"
ammonia = "4.1.1"
chrono = "0.4"

[build-dependencies]
tonic-prost-build = "0.14"
Expand Down
10 changes: 1 addition & 9 deletions src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use tracing::Instrument;

use crate::{
error::ApiError,
http::GRPC_SERVER_RESTART_CHANNEL,
proto::{core_request, core_response, proxy_server, CoreRequest, CoreResponse, DeviceInfo},
MIN_CORE_VERSION, VERSION,
};
Expand All @@ -48,7 +47,6 @@ pub(crate) struct ProxyServer {
pub(crate) connected: Arc<AtomicBool>,
pub(crate) core_version: Arc<Mutex<Option<Version>>>,
config: Arc<Mutex<Option<Configuration>>>,
setup_in_progress: Arc<AtomicBool>,
}

impl ProxyServer {
Expand All @@ -63,7 +61,6 @@ impl ProxyServer {
connected: Arc::new(AtomicBool::new(false)),
core_version: Arc::new(Mutex::new(None)),
config: Arc::new(Mutex::new(None)),
setup_in_progress: Arc::new(AtomicBool::new(false)),
}
}

Expand Down Expand Up @@ -122,11 +119,7 @@ impl ProxyServer {

builder
.add_service(versioned_service)
.serve_with_shutdown(addr, async move {
let mut rx_lock = GRPC_SERVER_RESTART_CHANNEL.1.lock().await;
rx_lock.recv().await;
info!("Shutting down gRPC server for restart...");
})
.serve(addr)
.await
.map_err(|err| {
error!("gRPC server error: {err}");
Expand Down Expand Up @@ -195,7 +188,6 @@ impl Clone for ProxyServer {
core_version: Arc::clone(&self.core_version),
http_channel: self.http_channel.clone(),
config: Arc::clone(&self.config),
setup_in_progress: Arc::clone(&self.setup_in_progress),
}
}
}
Expand Down
22 changes: 9 additions & 13 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
fs::read_to_string,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
sync::{atomic::Ordering, Arc, LazyLock},
sync::{atomic::Ordering, Arc},
time::Duration,
};

Expand Down Expand Up @@ -40,7 +40,7 @@ use crate::{
grpc::{Configuration, ProxyServer},
handlers::{desktop_client_mfa, enrollment, password_reset, polling},
setup::ProxySetupServer,
CommsChannel, VERSION,
LogsReceiver, VERSION,
};

pub(crate) static ENROLLMENT_COOKIE_NAME: &str = "defguard_proxy";
Expand All @@ -50,13 +50,8 @@ const DEFGUARD_CORE_VERSION_HEADER: &str = "defguard-core-version";
const RATE_LIMITER_CLEANUP_PERIOD: Duration = Duration::from_secs(60);
const X_FORWARDED_FOR: &str = "x-forwarded-for";
const X_POWERED_BY: &str = "x-powered-by";
const GRPC_CERT_NAME: &str = "proxy_grpc_cert.pem";
const GRPC_KEY_NAME: &str = "proxy_grpc_key.pem";

pub static GRPC_SERVER_RESTART_CHANNEL: LazyLock<CommsChannel<()>> = LazyLock::new(|| {
let (tx, rx) = tokio::sync::mpsc::channel(100);
(Arc::new(Mutex::new(tx)), Arc::new(Mutex::new(rx)))
});
pub(crate) const GRPC_CERT_NAME: &str = "proxy_grpc_cert.pem";
pub(crate) const GRPC_KEY_NAME: &str = "proxy_grpc_key.pem";

#[derive(Clone)]
pub(crate) struct AppState {
Expand Down Expand Up @@ -173,7 +168,7 @@ async fn powered_by_header<B>(mut response: Response<B>) -> Response<B> {
response
}

pub async fn run_server(config: Config) -> anyhow::Result<()> {
pub async fn run_server(config: Config, logs_rx: LogsReceiver) -> anyhow::Result<()> {
info!("Starting Defguard Proxy server");
debug!("Using config: {config:?}");

Expand All @@ -188,7 +183,7 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {

let server_clone = grpc_server.clone();

let setup_server = ProxySetupServer::new();
let setup_server = ProxySetupServer::new(logs_rx.clone());

// Start gRPC server.
// TODO: Wait with spawning the HTTP server until gRPC server is ready.
Expand All @@ -200,6 +195,7 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
}

loop {
info!("Starting gRPC server...");
let server_to_run = server_clone.clone();

if let (Some(cert), Some(key)) = (
Expand All @@ -214,11 +210,11 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
} else if !server_clone.setup_completed() {
// Only attempt setup if not already configured
info!(
"No gRPC TLS certificates found at {}, new certificates will be generated",
"No gRPC TLS certificates found at {}, new certificates will be obtained during setup",
cert_dir.display()
);
let configuration = setup_server
.await_setup(SocketAddr::new(
.await_initial_setup(SocketAddr::new(
config
.grpc_bind_address
.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)),
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::sync::Arc;
use defguard_version::Version;
use tokio::sync::mpsc;

use crate::proto::LogEntry;

pub mod assets;
pub mod config;
mod enterprise;
Expand All @@ -27,3 +29,5 @@ type CommsChannel<T> = (
Arc<tokio::sync::Mutex<mpsc::Sender<T>>>,
Arc<tokio::sync::Mutex<mpsc::Receiver<T>>>,
);

type LogsReceiver = Arc<tokio::sync::Mutex<mpsc::Receiver<LogEntry>>>;
71 changes: 67 additions & 4 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use defguard_version::{
ComponentInfo, DefguardVersionError, Version,
};
use log::LevelFilter;
use tokio::sync::mpsc::Sender;
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::{
fmt::{
Expand All @@ -19,25 +20,33 @@ use tracing_subscriber::{
layer::SubscriberExt,
registry::LookupSpan,
util::SubscriberInitExt,
EnvFilter,
EnvFilter, Layer,
};

use crate::proto::LogEntry;

// Initializes tracing with the specified log level and version information.
// Allows fine-grained filtering with `EnvFilter` directives.
// The directives are read from `DEFGUARD_PROXY_LOG_FILTER` env variable.
// For more info read: <https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html>
pub fn init_tracing(own_version: Version, level: &LevelFilter) -> Result<(), DefguardVersionError> {
pub fn init_tracing(
own_version: Version,
level: &LevelFilter,
logs_tx: Sender<LogEntry>,
) -> Result<(), DefguardVersionError> {
tracing_subscriber::registry()
.with(
EnvFilter::try_from_env("DEFGUARD_PROXY_LOG_FILTER")
.unwrap_or_else(|_| level.to_string().into()),
EnvFilter::try_from_env("DEFGUARD_PROXY_LOG_FILTER").unwrap_or_else(|_| {
format!("{level},h2=warn,h2::codec=off,tower=warn,hyper=warn").into()
}),
)
.with(VersionFieldLayer)
.with(
fmt::layer()
.event_format(HttpVersionFormatter::new(own_version))
.fmt_fields(VersionFilteredFields),
)
.with(GrpcLogLayer::new(logs_tx))
.init();

info!("Tracing initialized");
Expand Down Expand Up @@ -157,3 +166,57 @@ where
writeln!(versioned_writer)
}
}

/// A tracing layer that sends log entries to a gRPC logs channel.
pub struct GrpcLogLayer {
logs_tx: Sender<LogEntry>,
}

impl GrpcLogLayer {
#[must_use]
pub const fn new(logs_tx: Sender<LogEntry>) -> Self {
Self { logs_tx }
}
}

impl<S> Layer<S> for GrpcLogLayer
where
S: Subscriber,
{
fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
if self.logs_tx.is_closed() {
return;
}

let mut visitor = LogVisitor::default();
event.record(&mut visitor);

let entry = LogEntry {
level: format!("{:?}", event.metadata().level()),
target: event.metadata().target().to_string(),
message: visitor.message,
timestamp: chrono::Utc::now().to_rfc3339(),
fields: visitor.fields,
};

// Drop the buffer overflow error for now
let _ = self.logs_tx.try_send(entry);
}
}

#[derive(Default)]
struct LogVisitor {
message: String,
fields: std::collections::HashMap<String, String>,
}

impl tracing::field::Visit for LogVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = format!("{value:?}");
} else {
self.fields
.insert(field.name().to_string(), format!("{value:?}"));
}
}
}
11 changes: 9 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::sync::Arc;

use defguard_proxy::{config::get_config, http::run_server, logging::init_tracing, VERSION};
use defguard_version::Version;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -8,13 +11,17 @@ async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
}

// TODO: The channel size may need to be adjusted or some other approach should be used
// to avoid dropping log messages.
let (logs_tx, logs_rx) = mpsc::channel(200);

let config = get_config()?;
init_tracing(Version::parse(VERSION)?, &config.log_level)?;
init_tracing(Version::parse(VERSION)?, &config.log_level, logs_tx)?;
// read config from env
tracing::info!("Starting ... version v{}", VERSION);

// run API web server
run_server(config).await?;
run_server(config, Arc::new(tokio::sync::Mutex::new(logs_rx))).await?;

Ok(())
}
34 changes: 30 additions & 4 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use defguard_version::{
DefguardComponent, Version,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{transport::Server, Request, Response, Status};

use crate::{
error::ApiError,
grpc::Configuration,
proto::{proxy_setup_server, DerPayload, InitialSetupInfo},
CommsChannel, MIN_CORE_VERSION, VERSION,
proto::{proxy_setup_server, DerPayload, InitialSetupInfo, LogEntry},
CommsChannel, LogsReceiver, MIN_CORE_VERSION, VERSION,
};

static SETUP_CHANNEL: LazyLock<CommsChannel<Option<Configuration>>> = LazyLock::new(|| {
Expand All @@ -31,30 +32,33 @@ static SETUP_CHANNEL: LazyLock<CommsChannel<Option<Configuration>>> = LazyLock::
pub(crate) struct ProxySetupServer {
setup_in_progress: Arc<AtomicBool>,
key_pair: Arc<Mutex<Option<defguard_certs::RcGenKeyPair>>>,
logs_rx: LogsReceiver,
}

impl Clone for ProxySetupServer {
fn clone(&self) -> Self {
Self {
setup_in_progress: Arc::clone(&self.setup_in_progress),
key_pair: Arc::clone(&self.key_pair),
logs_rx: Arc::clone(&self.logs_rx),
}
}
}

impl ProxySetupServer {
pub fn new() -> Self {
pub fn new(logs_rx: LogsReceiver) -> Self {
Self {
setup_in_progress: Arc::new(AtomicBool::new(false)),
key_pair: Arc::new(Mutex::new(None)),
logs_rx,
}
}

/// Await setup connection from Defguard Core and process it.
/// Spins up a dedicated temporary gRPC server over HTTP to handle the setup process.
/// The setup process involves generating a CSR, sending it to Core and receiving signed TLS certificates.
/// Returns the received gRPC configuration (locally generated key pair and remotely signed certificate) upon successful setup.
pub(crate) async fn await_setup(
pub(crate) async fn await_initial_setup(
&self,
addr: SocketAddr,
) -> Result<Configuration, anyhow::Error> {
Expand Down Expand Up @@ -109,6 +113,8 @@ impl ProxySetupServer {

#[tonic::async_trait]
impl proxy_setup_server::ProxySetup for ProxySetupServer {
type ReadLogsStream = UnboundedReceiverStream<Result<LogEntry, Status>>;

async fn start(
&self,
request: Request<InitialSetupInfo>,
Expand Down Expand Up @@ -213,4 +219,24 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {

Ok(Response::new(()))
}

async fn read_logs(
&self,
_request: Request<()>,
) -> Result<Response<Self::ReadLogsStream>, Status> {
let logs_rx = self.logs_rx.clone();

let (tx, rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
while let Some(log_entry) = logs_rx.lock().await.recv().await {
if let Err(e) = tx.send(Ok(log_entry)) {
debug!("Failed to send log entry to gRPC stream: receiver disconnected ({e})",);
break;
}
}
});

Ok(Response::new(UnboundedReceiverStream::new(rx)))
}
}
Loading