Skip to content

Commit

Permalink
fix: event emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnht-chotot committed Jun 17, 2021
1 parent ac429b6 commit b573c10
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 108 deletions.
4 changes: 4 additions & 0 deletions emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
const EventEmitter = require('events')
const emitter = new EventEmitter()

module.exports = emitter;
30 changes: 0 additions & 30 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,6 @@ const { FORMAT_HTTP_HEADERS, FORMAT_TEXT_MAP } = require("opentracing");
const { tagObject, isExcludedPath } = require("./helper");
const { getContext, setContext } = require("./context");
const {registration,signUpEvent} = require('./instruments')
const EventEmitter = require('events')

global.tracer=null;
global.emitter = new EventEmitter()

const {
PerformanceObserver,
performance
} = require('perf_hooks');

const obs = new PerformanceObserver((items) => {
if (global.tracer){
const span = globalTracer.startSpan("performance_hooks")
span.setTag("operation_name",items.getEntries()[0].name)
span.log({
originTime: performance.timeOrigin,
data: items.getEntries()[0],
message: "origin time in MS, duration in MS"
})
span.finish()

performance.clearMarks();
}

});
obs.observe({
entryTypes: ['measure']
});

const buildSpanName = req => {
const params = req.pathParams || req.params;
Expand Down Expand Up @@ -213,8 +185,6 @@ class JaegerMiddleware {
"response.body": stringify(res.resBody)
};
span.log(info);

span.finish();
} catch (error) {
console.log(error);
} finally {
Expand Down
5 changes: 4 additions & 1 deletion instruments.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'
const log4js = require('log4js')
const helper = require('./helper')
const emitter = require('./emitter')

log4js.levels = process.env.LOG_LEVEL || 'info'
const logger = log4js.getLogger('jaeger-client-nodejs-instruments')
Expand All @@ -20,7 +21,9 @@ const registration = ()=>{

const signUpEvent = span =>{
for (const module of modules) {
emitter.on(module,(db_type,operation_name,data)=>{
emitter.on(module,(...agrs)=>{
const eventData = agrs[0]
const {db_type,operation_name,data}=eventData
const child_span = global.tracer.startSpan(operation_name,{
childOf: span
})
Expand Down
128 changes: 51 additions & 77 deletions instruments_modules/mongodb.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,9 @@
'use strict'
const { stringify } = require("flatted/cjs");
const opentracing = require("opentracing");
const context = require('../context')
const helper = require('../helper')

const emitter = require('../emitter')
const requestIdMap = {}

const onStart = (event)=>{
const {
databaseName,
commandName,
command,
requestId
}= event;
const {
filter={},
projection={},
limit,
query={}
} = command
const collection = command[commandName]
const operation_name = `${databaseName}.${collection}.${commandName}`
// set operation name
requestIdMap[requestId]=`${databaseName}.${collection}.${commandName}`
emitter.emit('mongodb','mongodb',operation_name,{
filter={},
projection={},
limit,
query={}
})
const onStart = (event) => {
// {
// address: '10.60.3.8:27017',
// connectionId: 2,
Expand All @@ -42,68 +17,67 @@ const onStart = (event)=>{
// '$db': 'user_profiler'
// }
// }
const {
databaseName,
commandName,
command,
requestId
} = event;
const {
filter = {},
projection = {},
limit,
query = {}
} = command
const collection = command[commandName]
const operation_name = `${databaseName}.${collection}.${commandName}`
// set operation name
requestIdMap[requestId] = `${databaseName}.${collection}.${commandName}`
if (!operation_name.includes('Index')) {
emitter.emit('mongodb', {
db_type: 'mongodb',
operation_name,
data: {
filter,
projection,
limit,
query,
}
})
}
}

const onEnd = (event)=>{
const onEnd = (event) => {
const {
requestId,
reply={},
duration=0
}= event;
reply = {},
duration = 0
} = event;
const {
cursor={},
value={}
cursor = {},
value = {}
} = reply
const operation_name = requestIdMap[requestId]+`_response`
const firstBatch = cursor?.firstBatch || []
emitter.emit('mongodb','mongodb',operation_name,{
command_duration_ms: duration,
write_operation_response: value,
read_operation_response: firstBatch
})

const operation_name = requestIdMap[requestId] + `_response`
const firstBatch = cursor ?.firstBatch || []
if (!operation_name.includes('Index')) {
emitter.emit('mongodb', {
db_type: 'mongodb',
operation_name,
data: {
command_duration_ms: duration,
write_operation_response: value,
read_operation_response: firstBatch
}
})
}
delete requestIdMap[requestId]

// {
// address: '10.60.3.8:27017',
// connectionId: 2,
// requestId: 3,
// commandName: 'find',
// duration: 16,
// reply: {
// cursor: { firstBatch: [Array], id: 0, ns: 'user_profiler.reasons' },
// ok: 1
// }
// }

// {
// address: '10.60.3.8:27017',
// connectionId: 1,
// requestId: 6,
// commandName: 'findAndModify',
// duration: 11,
// reply: {
// lastErrorObject: { n: 1, updatedExisting: true },
// value: {
// _id: 60caf63a9d634c2eb2a8b2ed,
// is_deleted: false,
// name: 'abccc',
// created_at: 2021-06-17T07:14:02.702Z,
// updated_at: 2021-06-17T07:27:40.084Z,
// __v: 0
// },
// ok: 1
// }
// }

}

const start = ()=>{
const start = () => {
const path = process.cwd()
const listener = require(path+'/node_modules/mongodb').instrument()
const listener = require(path + '/node_modules/mongodb').instrument()
listener.on("started", onStart)
listener.on("succeeded", onEnd)
// listener.on("failed", onEnd)
}

module.exports = {
Expand Down

0 comments on commit b573c10

Please sign in to comment.