Skip to content

Commit 107b31f

Browse files
committed
[filter] Support shared mode and output file rotating in tee() filter
1 parent 38f4ccf commit 107b31f

File tree

6 files changed

+190
-74
lines changed

6 files changed

+190
-74
lines changed

src/api/logging.cpp

+8-52
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ bool Logger::tail(const std::string &name, Data &buffer) {
118118
}
119119

120120
void Logger::close_all() {
121-
FileTarget::close_all_writers();
121+
122122
}
123123

124124
Logger::Logger(pjs::Str *name)
@@ -305,69 +305,24 @@ void Logger::StdoutTarget::write(const Data &msg) {
305305
// Logger::FileTarget
306306
//
307307

308-
void Logger::FileTarget::close_all_writers() {
309-
s_all_writers.clear();
310-
}
311-
312-
Logger::FileTarget::FileTarget(pjs::Str *filename)
308+
Logger::FileTarget::FileTarget(pjs::Str *filename, const Options &options)
313309
: m_filename(pjs::Str::make(fs::abs_path(filename->str())))
314-
{
315-
}
316-
317-
void Logger::FileTarget::write(const Data &msg) {
318-
auto name = m_filename->data()->retain();
319-
auto sd = SharedData::make(msg)->retain();
320-
Net::main().post(
321-
[=]() {
322-
Writer *writer = nullptr;
323-
const auto &filename = name->str();
324-
auto i = s_all_writers.find(filename);
325-
if (i != s_all_writers.end()) {
326-
writer = i->second.get();
327-
} else {
328-
writer = new Writer(filename);
329-
s_all_writers[filename].reset(writer);
330-
}
331-
InputContext ic;
332-
Data data;
333-
sd->to_data(data);
334-
writer->write(data);
335-
name->release();
336-
sd->release();
337-
}
338-
);
339-
}
340-
341-
//
342-
// Logger::FileTarget::Writer
343-
//
344-
345-
std::map<std::string, std::unique_ptr<Logger::FileTarget::Writer>> Logger::FileTarget::s_all_writers;
346-
347-
Logger::FileTarget::Writer::Writer(const std::string &filename)
348-
: m_module(new Module)
310+
, m_options(options)
311+
, m_module(new Module)
349312
{
350313
PipelineLayout *ppl = PipelineLayout::make();
351-
Tee::Options options;
352-
options.append = true;
353314
ppl->append(new Tee(filename, options));
354-
355315
m_pipeline_layout = ppl;
356316
m_pipeline = Pipeline::make(ppl, Context::make());
357317
}
358318

359-
void Logger::FileTarget::Writer::write(const Data &msg) {
319+
void Logger::FileTarget::write(const Data &msg) {
360320
Data *buf = Data::make();
361321
s_dp.push(buf, &msg);
362322
s_dp.push(buf, '\n');
363323
m_pipeline->input()->input(buf);
364324
}
365325

366-
void Logger::FileTarget::Writer::shutdown() {
367-
m_module->shutdown();
368-
m_pipeline = nullptr;
369-
}
370-
371326
//
372327
// Logger::SyslogTarget
373328
//
@@ -605,8 +560,9 @@ template<> void ClassDef<Logger>::init() {
605560

606561
method("toFile", [](Context &ctx, Object *obj, Value &ret) {
607562
pjs::Str *filename;
608-
if (!ctx.arguments(1, &filename)) return;
609-
obj->as<Logger>()->add_target(new Logger::FileTarget(filename));
563+
pjs::Object *options = nullptr;
564+
if (!ctx.arguments(1, &filename, &options)) return;
565+
obj->as<Logger>()->add_target(new Logger::FileTarget(filename, options));
610566
ret.set(obj);
611567
});
612568

src/api/logging.hpp

+10-19
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "module.hpp"
3232
#include "fstream.hpp"
3333
#include "filters/pack.hpp"
34+
#include "filters/tee.hpp"
3435
#include "filters/tls.hpp"
3536

3637
#include <atomic>
@@ -97,9 +98,12 @@ class Logger : public pjs::ObjectTemplate<Logger> {
9798

9899
class FileTarget : public Target {
99100
public:
100-
static void close_all_writers();
101+
struct Options : public Tee::Options {
102+
Options() { shared = true; append = true; }
103+
Options(pjs::Object *options) : Tee::Options(options) { shared = true; append = true; }
104+
};
101105

102-
FileTarget(pjs::Str *filename);
106+
FileTarget(pjs::Str *filename, const Options &options = Options());
103107

104108
private:
105109
virtual void write(const Data &msg) override;
@@ -116,24 +120,11 @@ class Logger : public pjs::ObjectTemplate<Logger> {
116120
}
117121
};
118122

119-
//
120-
// Logger::FileTarget::Writer
121-
//
122-
123-
class Writer {
124-
public:
125-
Writer(const std::string &filename);
126-
void write(const Data &msg);
127-
void shutdown();
128-
private:
129-
pjs::Ref<Module> m_module;
130-
pjs::Ref<PipelineLayout> m_pipeline_layout;
131-
pjs::Ref<Pipeline> m_pipeline;
132-
};
133-
134123
pjs::Ref<pjs::Str> m_filename;
135-
136-
static std::map<std::string, std::unique_ptr<Writer>> s_all_writers;
124+
Options m_options;
125+
pjs::Ref<Module> m_module;
126+
pjs::Ref<PipelineLayout> m_pipeline_layout;
127+
pjs::Ref<Pipeline> m_pipeline;
137128
};
138129

139130
//

src/filters/tee.cpp

+131-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424
*/
2525

2626
#include "tee.hpp"
27-
#include "log.hpp"
27+
#include "fs.hpp"
28+
#include "input.hpp"
29+
#include "utils.hpp"
30+
31+
#include <algorithm>
2832

2933
namespace pipy {
3034

@@ -33,9 +37,21 @@ namespace pipy {
3337
//
3438

3539
Tee::Options::Options(pjs::Object *options) {
40+
Value(options, "shared")
41+
.get(shared)
42+
.check_nullable();
3643
Value(options, "append")
3744
.get(append)
3845
.check_nullable();
46+
Value(options, "maxFileSize")
47+
.get(max_file_size)
48+
.check_nullable();
49+
Value(options, "maxFileCount")
50+
.get(max_file_count)
51+
.check_nullable();
52+
Value(options, "rotateInterval")
53+
.get(rotate_interval)
54+
.check_nullable();
3955
}
4056

4157
//
@@ -73,6 +89,7 @@ void Tee::reset() {
7389
m_file->close();
7490
m_file = nullptr;
7591
}
92+
m_target = nullptr;
7693
m_resolved_filename = nullptr;
7794
}
7895

@@ -84,12 +101,18 @@ void Tee::process(Event *evt) {
84101
auto *s = filename.to_string();
85102
m_resolved_filename = s;
86103
s->release();
87-
m_file = File::make(m_resolved_filename->str());
88-
m_file->open_write(m_options.append);
104+
if (m_options.shared) {
105+
m_target = get_target(m_resolved_filename->str(), m_options);
106+
} else {
107+
m_file = File::make(m_resolved_filename->str());
108+
m_file->open_write(m_options.append);
109+
}
89110
}
90111

91112
if (m_file) {
92113
m_file->write(*data);
114+
} else if (m_target) {
115+
m_target->write(*data);
93116
}
94117

95118
} else if (evt->is<StreamEnd>()) {
@@ -102,4 +125,109 @@ void Tee::process(Event *evt) {
102125
output(evt);
103126
}
104127

128+
std::map<std::string, pjs::Ref<Tee::Target>> Tee::s_targets;
129+
std::mutex Tee::s_targets_mutex;
130+
131+
auto Tee::get_target(const std::string &filename, const Options &options) -> Target* {
132+
std::lock_guard<std::mutex> lock(s_targets_mutex);
133+
auto path = filename == "-" ? filename : fs::abs_path(filename);
134+
auto i = s_targets.find(path);
135+
if (i != s_targets.end()) return i->second;
136+
return s_targets[path] = new Target(path, options);
137+
}
138+
139+
Tee::Target::Target(const std::string &filename, const Tee::Options &options)
140+
: m_filename(filename)
141+
, m_options(options)
142+
{
143+
}
144+
145+
Tee::Target::~Target() {
146+
147+
}
148+
149+
void Tee::Target::write(const Data &data) {
150+
if (Net::main().is_running()) {
151+
auto sd = SharedData::make(data)->retain();
152+
Net::main().post([=]() {
153+
Net::main().post(
154+
[=]() {
155+
Data data;
156+
sd->to_data(data);
157+
write_async(data);
158+
sd->release();
159+
}
160+
);
161+
});
162+
} else {
163+
write_async(data);
164+
}
165+
}
166+
167+
void Tee::Target::write_async(const Data &data) {
168+
if (m_file && m_written_size > 0 && m_filename != "-" && (
169+
(m_options.max_file_size > 0 && m_written_size + data.size() > m_options.max_file_size) ||
170+
(m_options.rotate_interval > 0 && utils::now() - m_file_time > m_options.rotate_interval * 1000)
171+
)) {
172+
m_file->close();
173+
m_file = nullptr;
174+
175+
auto sec = std::floor(m_file_time / 1000);
176+
auto t = std::time_t(sec);
177+
std::tm tm;
178+
localtime_r(&t, &tm);
179+
180+
char str[100];
181+
auto len = std::strftime(str, sizeof(str), "%Y-%m-%d-%H-%M-%S-", &tm);
182+
auto date_filename = utils::path_join(
183+
utils::path_dirname(m_filename),
184+
std::string(str, len) + utils::path_basename(m_filename)
185+
);
186+
187+
fs::rename(m_filename, date_filename);
188+
189+
if (m_options.max_file_count > 0) {
190+
auto dirname = utils::path_dirname(m_filename);
191+
auto basename = utils::path_basename(m_filename);
192+
std::list<std::string> all;
193+
fs::read_dir(dirname, all);
194+
std::vector<std::string> names;
195+
for (const auto &name : all) {
196+
if (utils::ends_with(name, basename)) {
197+
names.push_back(name);
198+
}
199+
}
200+
if (names.size() > m_options.max_file_count) {
201+
std::sort(
202+
names.begin(), names.end(),
203+
[](const std::string &a, const std::string &b) -> bool { return a > b; }
204+
);
205+
while (names.size() > m_options.max_file_count) {
206+
auto name = names.back();
207+
names.pop_back();
208+
fs::unlink(utils::path_join(dirname, name));
209+
}
210+
}
211+
}
212+
}
213+
214+
if (!m_file) {
215+
m_file_time = utils::now();
216+
m_written_size = 0;
217+
if (m_filename != "-") {
218+
fs::Stat st;
219+
if (fs::stat(m_filename, st) && st.is_file()) {
220+
m_file_time = st.ctime * 1000;
221+
m_written_size = st.size;
222+
}
223+
}
224+
m_file = File::make(m_filename);
225+
m_file->open_write(m_options.append);
226+
}
227+
228+
InputContext ic;
229+
m_file->write(data);
230+
m_written_size += data.size();
231+
}
232+
105233
} // namespace pipy

src/filters/tee.hpp

+30
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
#include "options.hpp"
3333
#include "fstream.hpp"
3434

35+
#include <atomic>
36+
#include <string>
37+
3538
namespace pipy {
3639

3740
//
@@ -41,11 +44,32 @@ namespace pipy {
4144
class Tee : public Filter {
4245
public:
4346
struct Options : public pipy::Options {
47+
bool shared = false;
4448
bool append = false;
49+
int max_file_size = 0;
50+
int max_file_count = 0;
51+
double rotate_interval = 0;
4552
Options() {}
4653
Options(pjs::Object *options);
4754
};
4855

56+
class Target : public pjs::RefCountMT<Target> {
57+
public:
58+
Target(const std::string &filename, const Options &options);
59+
~Target();
60+
61+
void write(const Data &data);
62+
63+
private:
64+
const std::string m_filename;
65+
Options m_options;
66+
pjs::Ref<File> m_file;
67+
int m_written_size = 0;
68+
double m_file_time = 0;
69+
70+
void write_async(const Data &data);
71+
};
72+
4973
Tee(const pjs::Value &filename, const Options &options);
5074

5175
private:
@@ -61,6 +85,12 @@ class Tee : public Filter {
6185
pjs::Value m_filename;
6286
pjs::Ref<pjs::Str> m_resolved_filename;
6387
pjs::Ref<File> m_file;
88+
pjs::Ref<Target> m_target;
89+
90+
static std::map<std::string, pjs::Ref<Target>> s_targets;
91+
static std::mutex s_targets_mutex;
92+
93+
static auto get_target(const std::string &filename, const Options &options) -> Target*;
6494
};
6595

6696
} // namespace pipy

0 commit comments

Comments
 (0)