Skip to content

Commit

Permalink
Merge pull request #47 from sysflow-telemetry/logging
Browse files Browse the repository at this point in the history
refactor(libs): refactor log messages
  • Loading branch information
araujof authored Aug 31, 2022
2 parents 45e87d8 + d6d3170 commit 3e6cf9c
Show file tree
Hide file tree
Showing 17 changed files with 225 additions and 257 deletions.
22 changes: 18 additions & 4 deletions src/libs/containercontext.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** Copyright (C) 2019 IBM Corporation.
/** Copyright (C) 2022 IBM Corporation.
*
* Authors:
* Frederico Araujo <[email protected]>
Expand Down Expand Up @@ -65,24 +65,27 @@ ContainerObj *ContainerContext::createContainer(sinsp_threadinfo *ti) {
cont->incomplete = true;
return cont;
}

auto *cont = new ContainerObj();
setContainer(&cont, container);
if (cont->cont.name.compare(INCOMPLETE) == 0 ||
cont->cont.image.compare(INCOMPLETE_IMAGE) == 0) {
cont->incomplete = true;
}

if (m_cxt->isK8sEnabled()) {
std::cout << "Get pod for container" << std::endl;
std::shared_ptr<PodObj> pod = m_k8sCxt->getPod(ti);
if (pod != nullptr) {
std::cout << "setting pod id to " << pod->pod.id << " for container "
<< ti->m_container_id << std::endl;
SF_DEBUG(m_logger, "Setting pod id to " << pod->pod.id
<< " for container "
<< ti->m_container_id)
cont->cont.podId.set_string(pod->pod.id);
pod->refs++;
} else {
cont->cont.podId.set_null();
}
}

return cont;
}

Expand Down Expand Up @@ -125,12 +128,14 @@ ContainerObj *ContainerContext::getContainer(sinsp_threadinfo *ti) {
if (ti->m_container_id.empty()) {
return nullptr;
}

ContainerObj *ct = nullptr;
ContainerTable::iterator cont = m_containers.find(ti->m_container_id);
if (cont != m_containers.end()) {
if (cont->second->written && !cont->second->incomplete) {
return cont->second;
}

const sinsp_container_info::ptr_t container =
m_cxt->getInspector()->m_container_manager.get_container(
ti->m_container_id);
Expand All @@ -139,6 +144,7 @@ ContainerObj *ContainerContext::getContainer(sinsp_threadinfo *ti) {
// delete cont->second;
return cont->second;
}

if (cont->second->written && cont->second->incomplete) {
SF_DEBUG(m_logger,
"Container is written and includes name: " << container->m_name);
Expand All @@ -149,39 +155,47 @@ ContainerObj *ContainerContext::getContainer(sinsp_threadinfo *ti) {
cont->second->incomplete = false;
}
}

ct = cont->second;
setContainer(&ct, container);
}

if (ct == nullptr) {
ct = createContainer(ti);
} else {
if (m_cxt->isK8sEnabled()) {
reupPod(ti, ct);
}
}

if (ct == nullptr) {
return nullptr;
}

m_containers[ct->cont.id] = ct;
m_writer->writeContainer(&(ct->cont));
ct->written = true;

return ct;
}

void ContainerContext::reupPod(sinsp_threadinfo *ti, ContainerObj *cont) {
if (!m_cxt->isK8sEnabled()) {
return;
}

string podId = "";
if (!cont->cont.podId.is_null()) {
podId = cont->cont.podId.get_string();
}

if (!podId.empty()) {
auto pod1 = m_k8sCxt->getPod(podId);
if (pod1 != nullptr) {
pod1->refs--;
}
}

auto pod = m_k8sCxt->getPod(ti);
if (pod != nullptr) {
cont->cont.podId.set_string(pod->pod.id);
Expand Down
14 changes: 2 additions & 12 deletions src/libs/controlflowprocessor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** Copyright (C) 2019 IBM Corporation.
/** Copyright (C) 2022 IBM Corporation.
*
* Authors:
* Frederico Araujo <[email protected]>
Expand Down Expand Up @@ -48,7 +48,6 @@ inline void ControlFlowProcessor::processFlow(sinsp_evt *ev, OpFlags flag) {
ProcessObj *proc = m_processCxt->getProcess(ev, SFObjectState::REUP, created);
ProcessFlowObj *pfo = proc->pfo;
if (pfo == nullptr) {
// SF_INFO(m_logger, "Creating a new process flow")
processNewFlow(ev, proc, flag);
} else {
updateProcFlow(pfo, flag, ev);
Expand Down Expand Up @@ -87,7 +86,6 @@ inline void ControlFlowProcessor::updateProcFlow(ProcessFlowObj *pf,
if (flag == OP_CLONE) {
int res = utils::getSyscallResult(ev);
if (res == 0) {
// nf->netflow.numRRecvBytes += res;
pf->procflow.numThreadsCloned++;
} else if (res == -1) {
pf->procflow.numCloneErrors++;
Expand All @@ -98,7 +96,6 @@ inline void ControlFlowProcessor::updateProcFlow(ProcessFlowObj *pf,
}

inline void ControlFlowProcessor::removeAndWriteProcessFlow(ProcessObj *proc) {
// SF_INFO(m_logger, "removeAndWriteProcessFlow")
m_writer->writeProcessFlow(&((proc->pfo)->procflow), &(proc->proc));
m_processCxt->removeProcessFromSet(proc, true);
proc->pfo = nullptr;
Expand Down Expand Up @@ -169,7 +166,7 @@ void ControlFlowProcessor::exportProcessFlow(ProcessFlowObj *pfo) {
}

void ControlFlowProcessor::printFlowStats() {
SF_INFO(m_logger, "CF Set: " << m_pfSet->size());
SF_DEBUG(m_logger, "CF Set: " << m_pfSet->size());
}

int ControlFlowProcessor::checkForExpiredRecords() {
Expand All @@ -185,21 +182,14 @@ int ControlFlowProcessor::checkForExpiredRecords() {
int i = 0;
SF_DEBUG(m_logger, "Checking expired PROC Flows!!!....");
for (auto it = m_pfSet->begin(); it != m_pfSet->end();) {
// SF_INFO(m_logger, "Checking flow with exportTime: " <<
// (*it)->pfo->exportTime
// << " Now: " << now);
if (difftime(now, (*it)->pfo->exportTime) >= m_cxt->getNFExportInterval()) {
SF_DEBUG(m_logger, "Exporting Proc flow!!! ");
if (difftime(now, (*it)->pfo->lastUpdate) >=
m_cxt->getNFExpireInterval()) {
// SF_INFO(m_logger, "Deleting processflow...")
// SF_INFO(m_logger, "NOW: " << now << " LastUpdate: " <<
//(*it)->pfo->lastUpdate);
delete (*it)->pfo;
(*it)->pfo = nullptr;
it = m_pfSet->erase(it);
} else {
// SF_INFO(m_logger, "Exporting processflow...checkForExpired")
exportProcessFlow((*it)->pfo);
ProcessObj *p = (*it);
it = m_pfSet->erase(it);
Expand Down
9 changes: 7 additions & 2 deletions src/libs/dataflowprocessor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** Copyright (C) 2019 IBM Corporation.
/** Copyright (C) 2022 IBM Corporation.
*
* Authors:
* Frederico Araujo <[email protected]>
Expand Down Expand Up @@ -70,13 +70,15 @@ int DataFlowProcessor::handleDataEvent(sinsp_evt *ev, OpFlags flag) {
return 1;
}
}

if (fdinfo->is_ipv4_socket() || fdinfo->is_ipv6_socket()) {
return m_netflowPrcr->handleNetFlowEvent(ev, flag);
} else if (IS_FILE_EVT(flag)) {
return m_fileevtPrcr->handleFileFlowEvent(ev, flag);
} else {
return m_fileflowPrcr->handleFileFlowEvent(ev, flag);
}

return 2;
}

Expand All @@ -87,18 +89,21 @@ int DataFlowProcessor::removeAndWriteDFFromProc(ProcessObj *proc, int64_t tid) {

void DataFlowProcessor::printFlowStats() {
m_procCxt->printStats();
SF_INFO(m_logger, "DF Set: " << m_dfSet.size());
SF_DEBUG(m_logger, "DF Set: " << m_dfSet.size());
}

int DataFlowProcessor::checkForExpiredRecords() {
time_t now = utils::getCurrentTime(m_cxt);

if (m_lastCheck == 0) {
m_lastCheck = now;
return 0;
}

if (difftime(now, m_lastCheck) < 1.0) {
return 0;
}

m_lastCheck = now;
int i = 0;
SF_DEBUG(m_logger, "Checking expired Flows!!!....");
Expand Down
5 changes: 4 additions & 1 deletion src/libs/fileeventprocessor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** Copyright (C) 2019 IBM Corporation.
/** Copyright (C) 2022 IBM Corporation.
*
* Authors:
* Frederico Araujo <[email protected]>
Expand Down Expand Up @@ -36,6 +36,7 @@ FileEventProcessor::~FileEventProcessor() = default;

int FileEventProcessor::handleFileFlowEvent(sinsp_evt *ev, OpFlags flag) {
int res = 1;

if (flag == OP_MKDIR || flag == OP_RMDIR || flag == OP_UNLINK) {
res = writeFileEvent(ev, flag);
} else if (flag == OP_LINK || flag == OP_SYMLINK || flag == OP_RENAME) {
Expand Down Expand Up @@ -115,6 +116,7 @@ int FileEventProcessor::writeFileEvent(sinsp_evt *ev, OpFlags flag) {
bool created = false;
ProcessObj *proc = m_processCxt->getProcess(ev, SFObjectState::REUP, created);
sinsp_fdinfo_t *fdinfo = ev->get_fd_info();

FileObj *file = nullptr;
if (fdinfo != nullptr) {
file = m_fileCxt->getFile(ev, fdinfo, SFObjectState::CREATED, created);
Expand All @@ -136,6 +138,7 @@ int FileEventProcessor::writeFileEvent(sinsp_evt *ev, OpFlags flag) {
file = m_fileCxt->getFile(ev, fileName, fileType, SFObjectState::CREATED,
created);
}

m_fileEvt.opFlags = flag;
m_fileEvt.ts = ev->get_ts();
m_fileEvt.procOID.hpid = proc->proc.oid.hpid;
Expand Down
16 changes: 11 additions & 5 deletions src/libs/fileflowprocessor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** Copyright (C) 2019 IBM Corporation.
/** Copyright (C) 2022 IBM Corporation.
*
* Authors:
* Frederico Araujo <[email protected]>
Expand Down Expand Up @@ -178,15 +178,12 @@ int FileFlowProcessor::handleFileFlowEvent(sinsp_evt *ev, OpFlags flag) {
} else {
fd = utils::getFD(ev);
if (!utils::isMapAnonymous(ev) && fd != -1) {
// SF_INFO(m_logger, "FDs: " << fd );
sinsp_threadinfo *ti = ev->get_thread_info();
fdinfo = ti->get_fd(fd);
/* if(fdinfo) {
SF_INFO(m_logger, "Found fdinfo for MMAP")
}*/
}
}
}

if (fdinfo == nullptr) {
return 1;
}
Expand All @@ -202,6 +199,7 @@ int FileFlowProcessor::handleFileFlowEvent(sinsp_evt *ev, OpFlags flag) {
if (m_cxt->isFileOnly() && !fdinfo->is_file()) {
return 1;
}

switch (restype) {
case SF_FILE:
case SF_DIR:
Expand All @@ -211,6 +209,7 @@ int FileFlowProcessor::handleFileFlowEvent(sinsp_evt *ev, OpFlags flag) {
default:
return 1;
}

bool created = false;
// calling get process is important because it ensures that the process object
// has been written to the sysflow file. This is important for long running
Expand Down Expand Up @@ -240,6 +239,7 @@ int FileFlowProcessor::handleFileFlowEvent(sinsp_evt *ev, OpFlags flag) {
} else {
processExistingFlow(ev, proc, file, flag, flowkey, ff, fdinfo);
}

return 0;
}

Expand All @@ -257,6 +257,7 @@ void FileFlowProcessor::removeFileFlow(ProcessObj *proc, FileObj *file,
int FileFlowProcessor::removeAndWriteFFFromProc(ProcessObj *proc, int64_t tid) {
SF_DEBUG(m_logger, "CALLING removeAndWriteFFFromProc");
int deleted = 0;

for (FileFlowTable::iterator ffi = proc->fileflows.begin();
ffi != proc->fileflows.end(); ffi++) {
if (tid == -1 || tid == ffi->second->fileflow.tid) {
Expand Down Expand Up @@ -284,16 +285,19 @@ int FileFlowProcessor::removeAndWriteFFFromProc(ProcessObj *proc, int64_t tid) {
}
}
}

if (tid == -1) {
proc->fileflows.clear();
}

return deleted;
}

int FileFlowProcessor::removeFileFlowFromSet(FileFlowObj **ffo,
bool deleteFileFlow) {
bool found = false;
int removed = 0;

for (auto iter = m_dfSet->find(*ffo); iter != m_dfSet->end(); iter++) {
if (!((*iter)->isNetworkFlow)) {
auto *foundObj = static_cast<FileFlowObj *>(*iter);
Expand All @@ -310,6 +314,7 @@ int FileFlowProcessor::removeFileFlowFromSet(FileFlowObj **ffo,
}
}
}

if (!found) {
SF_ERROR(m_logger,
"Cannot find FileFlow Object "
Expand Down Expand Up @@ -344,6 +349,7 @@ int FileFlowProcessor::removeFileFlowFromSet(FileFlowObj **ffo,
SF_ERROR(m_logger, "Deleted File Flow...");
}
}

return removed;
}

Expand Down
Loading

0 comments on commit 3e6cf9c

Please sign in to comment.