Skip to content

Commit

Permalink
feat(dumper): config.path can include {class}
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jan 18, 2022
1 parent 3705968 commit ddafdb4
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
.idea
Cargo.lock

example.dump
*.dump
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- errors: add `TrySendError::map` and `RequestError::map`.
- dumping: write `ts` firstly to support the `sort` utility.
- dumper: extract a dump's name if it's not specified.
- dumper: support the `{class}` variable in config's `path` param.

### Fixed
- init: do not start termination if the memory tracker fails to read files.
Expand Down
12 changes: 7 additions & 5 deletions elfo-dumper/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use fxhash::FxHashSet;
use metrics::counter;
use parking_lot::Mutex;
use tokio::{task, time::Duration};
use tracing::{debug, error, info, warn};
use tracing::{error, info, warn};

use elfo_core as elfo;
use elfo_macros::{message, msg_raw as msg};
Expand Down Expand Up @@ -59,7 +59,7 @@ impl Dumper {
}

async fn main(mut self) -> Result<()> {
let mut file = open_file(self.ctx.config()).await;
let mut file = open_file(self.ctx.config(), self.ctx.key()).await;

let class = Box::leak(self.ctx.key().clone().into_boxed_str());
let registry = { self.storage.lock().registry(class) };
Expand All @@ -74,15 +74,17 @@ impl Dumper {

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
// TODO: open on `ValidateConfig`
ReopenDumpFile | ConfigUpdated => {
let config = ctx.config();
interval.set_period(config.interval);
file = open_file(config).await;
file = open_file(config, self.ctx.key()).await;
}
DumpingTick => {
let registry = registry.clone();
let timeout = ctx.config().interval;

// TODO: run inside scope?
let report = task::spawn_blocking(move || -> Result<Report> {
let mut errors = Vec::new();
let mut written = 0;
Expand Down Expand Up @@ -214,13 +216,13 @@ pub(crate) fn new(storage: Arc<Mutex<Storage>>) -> Schema {
.exec(move |ctx| Dumper::new(ctx, storage_1.clone()).main())
}

async fn open_file(config: &Config) -> BufWriter<File> {
async fn open_file(config: &Config, class: &str) -> BufWriter<File> {
use tokio::fs::OpenOptions;

let file = OpenOptions::new()
.create(true)
.append(true)
.open(&config.path)
.open(&config.path(class))
.await
.expect("cannot open the dump file")
.into_std()
Expand Down
13 changes: 11 additions & 2 deletions elfo-dumper/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
use std::{path::PathBuf, time::Duration};
use std::time::Duration;

use serde::Deserialize;

#[derive(Debug, Deserialize)]
pub(crate) struct Config {
pub(crate) path: PathBuf,
/// A path to a dump file or template:
/// * `dumps/all.dump` - one file.
/// * `dumps/{class}.dump` - file per class.
path: String,
#[serde(with = "humantime_serde", default = "default_interval")]
pub(crate) interval: Duration,
#[serde(default = "default_registry_capacity")]
pub(crate) registry_capacity: usize,
}

impl Config {
pub(crate) fn path(&self, class: &str) -> String {
self.path.replace("{class}", class)
}
}

fn default_interval() -> Duration {
Duration::from_millis(500)
}
Expand Down
2 changes: 1 addition & 1 deletion elfo/examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ address = "0.0.0.0:9042"
#quantiles = [0.75, 0.9, 0.95, 0.99]

[system.dumpers]
path = "example.dump"
path = "example.{class}.dump"

[producers]
group_count = 3
Expand Down

0 comments on commit ddafdb4

Please sign in to comment.