From ddafdb4fa21faccc413abb4d94b9540771225c7e Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 9 Jan 2022 17:15:50 +0300 Subject: [PATCH] feat(dumper): `config.path` can include `{class}` --- .gitignore | 2 +- CHANGELOG.md | 1 + elfo-dumper/src/actor.rs | 12 +++++++----- elfo-dumper/src/config.rs | 13 +++++++++++-- elfo/examples/config.toml | 2 +- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 4bb814b0..b3d88587 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ .idea Cargo.lock -example.dump +*.dump diff --git a/CHANGELOG.md b/CHANGELOG.md index 7779ce2d..5bb95166 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - errors: add `TrySendError::map` and `RequestError::map`. - dumping: write `ts` firstly to support the `sort` utility. - dumper: extract a dump's name if it's not specified. +- dumper: support the `{class}` variable in config's `path` param. ### Fixed - init: do not start termination if the memory tracker fails to read files. diff --git a/elfo-dumper/src/actor.rs b/elfo-dumper/src/actor.rs index a820a19f..608ab7e1 100644 --- a/elfo-dumper/src/actor.rs +++ b/elfo-dumper/src/actor.rs @@ -11,7 +11,7 @@ use fxhash::FxHashSet; use metrics::counter; use parking_lot::Mutex; use tokio::{task, time::Duration}; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; use elfo_core as elfo; use elfo_macros::{message, msg_raw as msg}; @@ -59,7 +59,7 @@ impl Dumper { } async fn main(mut self) -> Result<()> { - let mut file = open_file(self.ctx.config()).await; + let mut file = open_file(self.ctx.config(), self.ctx.key()).await; let class = Box::leak(self.ctx.key().clone().into_boxed_str()); let registry = { self.storage.lock().registry(class) }; @@ -74,15 +74,17 @@ impl Dumper { while let Some(envelope) = ctx.recv().await { msg!(match envelope { + // TODO: open on `ValidateConfig` ReopenDumpFile | ConfigUpdated => { let config = ctx.config(); interval.set_period(config.interval); - file = open_file(config).await; + file = open_file(config, self.ctx.key()).await; } DumpingTick => { let registry = registry.clone(); let timeout = ctx.config().interval; + // TODO: run inside scope? let report = task::spawn_blocking(move || -> Result { let mut errors = Vec::new(); let mut written = 0; @@ -214,13 +216,13 @@ pub(crate) fn new(storage: Arc>) -> Schema { .exec(move |ctx| Dumper::new(ctx, storage_1.clone()).main()) } -async fn open_file(config: &Config) -> BufWriter { +async fn open_file(config: &Config, class: &str) -> BufWriter { use tokio::fs::OpenOptions; let file = OpenOptions::new() .create(true) .append(true) - .open(&config.path) + .open(&config.path(class)) .await .expect("cannot open the dump file") .into_std() diff --git a/elfo-dumper/src/config.rs b/elfo-dumper/src/config.rs index e5a9783a..42264354 100644 --- a/elfo-dumper/src/config.rs +++ b/elfo-dumper/src/config.rs @@ -1,16 +1,25 @@ -use std::{path::PathBuf, time::Duration}; +use std::time::Duration; use serde::Deserialize; #[derive(Debug, Deserialize)] pub(crate) struct Config { - pub(crate) path: PathBuf, + /// A path to a dump file or template: + /// * `dumps/all.dump` - one file. + /// * `dumps/{class}.dump` - file per class. + path: String, #[serde(with = "humantime_serde", default = "default_interval")] pub(crate) interval: Duration, #[serde(default = "default_registry_capacity")] pub(crate) registry_capacity: usize, } +impl Config { + pub(crate) fn path(&self, class: &str) -> String { + self.path.replace("{class}", class) + } +} + fn default_interval() -> Duration { Duration::from_millis(500) } diff --git a/elfo/examples/config.toml b/elfo/examples/config.toml index 25dd6fb4..1ad119a3 100644 --- a/elfo/examples/config.toml +++ b/elfo/examples/config.toml @@ -34,7 +34,7 @@ address = "0.0.0.0:9042" #quantiles = [0.75, 0.9, 0.95, 0.99] [system.dumpers] -path = "example.dump" +path = "example.{class}.dump" [producers] group_count = 3