use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tracing::{
field::{Field, Visit},
Level,
};
use tracing_subscriber::Layer;
type BoxError = Box<dyn std::error::Error>;
const WSHIM_LOGGING_ENDPOINT: &str = "https://workers-logging.cfdata.org/log";
#[derive(serde::Serialize)]
struct LogLine {
timestamp: u64,
log_level: &'static str,
#[serde(flatten)]
fields: HashMap<String, serde_json::Value>,
}
#[derive(serde::Serialize)]
struct SyslogPayloadLine {
message: LogLine,
priority: u8,
}
#[derive(serde::Serialize)]
struct SyslogPayload {
logs: Vec<SyslogPayloadLine>,
}
#[derive(Clone)]
pub struct SyslogLogger {
logging_token: String,
payload: Arc<Mutex<SyslogPayload>>,
}
impl SyslogLogger {
pub fn new(logging_token: &str) -> Self {
let payload = SyslogPayload { logs: vec![] };
SyslogLogger {
payload: Arc::new(Mutex::new(payload)),
logging_token: logging_token.to_owned(),
}
}
pub async fn flush(&self) -> Result<(), BoxError> {
let mut payload_mutex = self.payload.lock().unwrap();
if payload_mutex.logs.is_empty() {
return Ok(());
}
let payload = serde_json::to_string(&*payload_mutex)?;
let payload = wasm_bindgen::JsValue::from_str(&payload);
let mut headers = worker::Headers::new();
headers.set("Authorization", &format!("Bearer {}", self.logging_token))?;
let init = worker::RequestInit {
body: Some(payload),
headers,
method: worker::Method::Post,
..Default::default()
};
let req = worker::Request::new_with_init(WSHIM_LOGGING_ENDPOINT, &init)?;
let req = worker::Fetch::Request(req);
let mut res = req
.send()
.await
.map_err(|err| format!("failed to send logging request: {}", err))?;
if res.status_code() == 200 {
payload_mutex.logs.clear();
Ok(())
} else {
let body = res.text().await?;
Err(format!("logging request returned {}: {}", res.status_code(), body).into())
}
}
}
pub struct SyslogVisitor<'a>(&'a mut HashMap<String, serde_json::Value>);
impl<'a> Visit for SyslogVisitor<'a> {
fn record_f64(&mut self, field: &Field, value: f64) {
self.0
.insert(field.name().to_string(), serde_json::json!(value));
}
fn record_i64(&mut self, field: &Field, value: i64) {
self.0
.insert(field.name().to_string(), serde_json::json!(value));
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.0
.insert(field.name().to_string(), serde_json::json!(value));
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.0
.insert(field.name().to_string(), serde_json::json!(value));
}
fn record_str(&mut self, field: &Field, value: &str) {
self.0
.insert(field.name().to_string(), serde_json::json!(value));
}
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
self.0.insert(
field.name().to_string(),
serde_json::json!(value.to_string()),
);
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.0.insert(
field.name().to_string(),
serde_json::json!(format!("{:?}", value)),
);
}
}
impl<S> Layer<S> for SyslogLogger
where
S: tracing::Subscriber,
{
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let timestamp = worker::Date::now().as_millis();
let mut fields = HashMap::new();
let mut visitor = SyslogVisitor(&mut fields);
event.record(&mut visitor);
let log_line = LogLine {
timestamp,
log_level: event.metadata().level().as_str(),
fields,
};
{
let msg = log_line
.fields
.get("message")
.unwrap_or(&serde_json::Value::Null);
let mut fields = vec![];
for (k, v) in &log_line.fields {
if k == "message" {
continue;
}
fields.push(format!("{} => {}", k, v));
}
let fields = fields.join(",");
match event.metadata().level() {
&Level::ERROR => {
worker::console_error!("{}; {}", msg, fields);
}
&Level::WARN => {
worker::console_warn!("{}; {}", msg, fields);
}
&Level::INFO => {
worker::console_log!("{}; {}", msg, fields);
}
&Level::DEBUG | &Level::TRACE => {
worker::console_debug!("{}; {}", msg, fields);
}
}
}
{
let mut payload = self.payload.lock().unwrap();
payload.logs.push(SyslogPayloadLine {
message: log_line,
priority: 1,
});
}
}
}