Skip to content

Commit

Permalink
Merge pull request #1677 from telefonicaid/task/use_cb_flow_control_w…
Browse files Browse the repository at this point in the history
…hen_update

use options=flowControl when update
  • Loading branch information
fgalan authored Jan 14, 2025
2 parents 3775aa9 + 5327af3 commit b852a43
Show file tree
Hide file tree
Showing 44 changed files with 310 additions and 232 deletions.
4 changes: 3 additions & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
- Add: option to force to use CB flow control with new API field useCBflowControl at group and device device level (#1420)
- Add: useCBflowControl config setting (IOTA_CB_FLOW_CONTROL env var) to set CB flow control behaviour at instance level (#1420)
- Add: store last measure in device (by id, apikey, service and subservice) and new API field storeLastMeasure at group and device levels (#1669)
- Add: IOTA_STORE_LAST_MEASURE env var to set default store last measure behaviour at instance level (#1669)
- Add: storeLastMeasure config setting (IOTA_STORE_LAST_MEASURE env var) to set default store last measure behaviour at instance level (#1669)
- Upgrade express dep from 4.19.2 to 4.20.0
- Upgrade mongodb devdep from 4.17.1 to 4.17.2
- Upgrade mongoose dep from 5.13.20 to 8.8.4 (solving vulnerability CVE-2024-53900) (#1674)
1 change: 1 addition & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var config = {
deviceRegistrationDuration: 'P1M',
defaultType: 'Thing',
expressLimit: '1Mb',
useCBflowControl: false,
storeLastMeasure: false
};

Expand Down
10 changes: 8 additions & 2 deletions doc/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ environment variable).
#### `storeLastMeasure`

If this flag is activated, last measure arrived to Device IoTAgent without be processed will be stored in Device under
`lastMeasure` field (composed of sub-fields `timestamp` and `measure` for the measure itself, in multi-measure format). This flag is overwritten by `storeLastMeasure` flag in group or device. This flag
is disabled by default.
`lastMeasure` field (composed of sub-fields `timestamp` and `measure` for the measure itself, in multi-measure format).
This flag is overwritten by `storeLastMeasure` flag in group or device. This flag is disabled by default.

For example in a device document stored in MongoDB will be extended with a subdocument named lastMeasure like this:

Expand All @@ -444,6 +444,11 @@ For example in a device document stored in MongoDB will be extended with a subdo
}
```

#### `useCBflowControl`

If this flag is activated, when iotAgent invokes Context Broker will use [flowControl option](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/admin/perf_tuning.md#updates-flow-control-mechanism). This flag is overwritten by
`useCBflowControl` flag in group or device. This flag is disabled by default.

### Configuration using environment variables

Some of the configuration parameters can be overriden with environment variables, to ease the use of those parameters
Expand Down Expand Up @@ -507,6 +512,7 @@ overrides.
| IOTA_RELAX_TEMPLATE_VALIDATION | `relaxTemplateValidation` |
| IOTA_EXPRESS_LIMIT | `expressLimit` |
| IOTA_STORE_LAST_MEASURE | `storeLastMeasure` |
| IOTA_CB_FLOW_CONTROL | `useCBflowControl` |

Note:

Expand Down
9 changes: 7 additions & 2 deletions doc/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,8 @@ Config group is represented by a JSON object with the following fields:
| `payloadType` || string | | optional string value used to switch between **IoTAgent**, **NGSI-v2** and **NGSI-LD** measure payloads types. Possible values are: `iotagent`, `ngsiv2` or `ngsild`. The default is `iotagent`. |
| `transport` || `string` | | Transport protocol used by the group of devices to send updates, for the IoT Agents with multiple transport protocols. |
| `endpoint` || `string` | | Endpoint where the group of device is going to receive commands, if any. |
| `storeLastMeasure` || `boolean` | | Store in device last measure received. See more info [in this section](admin.md#storelastmeasure). False by default |
| `storeLastMeasure` || `boolean` | | Store in device last measure received. See more info [in this section](admin.md#storelastmeasure). False by default |
| `useCBflowControl` || `boolean` | | Use Context Broker flow control. See more info [in this section](admin.md#useCBflowControl). False by default |

### Config group operations

Expand Down Expand Up @@ -1999,7 +2000,11 @@ the API resource fields and the same fields in the database model.
| `ngsiVersion` || `string` | | string value used in mixed mode to switch between **NGSI-v2** and **NGSI-LD** payloads. The default is `v2`. When not running in mixed mode, this field is ignored. |
| `payloadType` || `string` | | optional string value used to switch between **IoTAgent**, **NGSI-v2** and **NGSI-LD** measure payloads types. Possible values are: `iotagent`, `ngsiv2` or `ngsild`. The default is `iotagent`. |

| `storeLastMeasure` || `boolean` | | Store in device last measure received. See more info [in this section](admin.md#storelastmeasure). False by default. |
| `storeLastMeasure` | ✓ | `boolean` | | Store in device last measure received. See more info
[in this section](admin.md#storelastmeasure). False by default.

| `useCBflowControl` | ✓ | `boolean` | | Use Context Broker flow control. See more info
[in this section](admin.md#useCBflowControl). False by default.

### Device operations

Expand Down
7 changes: 7 additions & 0 deletions lib/commonConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ function processEnvironmentVariables() {
'IOTA_LD_SUPPORT_NULL',
'IOTA_LD_SUPPORT_DATASET_ID',
'IOTA_EXPRESS_LIMIT',
'IOTA_USE_CB_FLOW_CONTROL',
'IOTA_STORE_LAST_MEASURE'
];
const iotamVariables = [
Expand Down Expand Up @@ -475,6 +476,11 @@ function processEnvironmentVariables() {
} else {
config.expressLimit = config.expressLimit ? config.expressLimit : '1mb';
}
if (process.env.IOTA_USE_CB_FLOW_CONTROL) {
config.useCBflowControl = process.env.IOTA_USE_CB_FLOW_CONTROL === 'true';
} else {
config.useCBflowControl = config.useCBflowControl === true;
}
if (process.env.IOTA_STORE_LAST_MEASURE) {
config.storeLastMeasure = process.env.IOTA_STORE_LAST_MEASURE === 'true';
} else {
Expand Down Expand Up @@ -510,6 +516,7 @@ function getConfigForTypeInformation() {
relaxTemplateValidation: config.relaxTemplateValidation,
defaultEntityNameConjunction: config.defaultEntityNameConjunction,
defaultType: config.defaultType,
useCBflowControl: config.useCBflowControl,
storeLastMeasure: config.storeLastMeasure
};
return conf;
Expand Down
1 change: 1 addition & 0 deletions lib/model/Device.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const Device = new Schema({
explicitAttrs: Group.ExplicitAttrsType,
ngsiVersion: String,
payloadType: String,
useCBflowControl: Boolean,
storeLastMeasure: Boolean,
lastMeasure: Object
});
Expand Down
1 change: 1 addition & 0 deletions lib/model/Group.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const Group = new Schema({
ngsiVersion: String,
entityNameExp: String,
payloadType: String,
useCBflowControl: Boolean,
storeLastMeasure: Boolean
});

Expand Down
1 change: 1 addition & 0 deletions lib/services/common/iotManagerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function register(callback) {
payloadType: service.payloadType,
endpoint: service.endpoint,
transport: service.transport,
useCBflowControl: service.useCBflowControl,
storeLastMeasure: service.storeLastMeasure
};
}
Expand Down
2 changes: 2 additions & 0 deletions lib/services/devices/deviceRegistryMongoDB.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const attributeList = [
'ngsiVersion',
'subscriptions',
'payloadType',
'useCBflowControl',
'storeLastMeasure'
];

Expand Down Expand Up @@ -320,6 +321,7 @@ function update(previousDevice, device, callback) {
data.timestamp = device.timestamp;
data.subscriptions = device.subscriptions;
data.payloadType = device.payloadType;
data.useCBflowControl = device.useCBflowControl;
data.storeLastMeasure = device.storeLastMeasure;

/* eslint-disable-next-line new-cap */
Expand Down
3 changes: 3 additions & 0 deletions lib/services/devices/deviceService.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ function mergeDeviceWithConfiguration(fields, defaults, deviceData, configuratio
if (configuration && configuration.payloadType !== undefined && deviceData.payloadType === undefined) {
deviceData.payloadType = configuration.payloadType;
}
if (configuration && configuration.useCBflowControl !== undefined && deviceData.useCBflowControl === undefined) {
deviceData.useCBflowControl = configuration.useCBflowControl;
}
if (configuration && configuration.storeLastMeasure !== undefined && deviceData.storeLastMeasure === undefined) {
deviceData.storeLastMeasure = configuration.storeLastMeasure;
}
Expand Down
3 changes: 3 additions & 0 deletions lib/services/devices/devices-NGSI-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ function updateRegisterDeviceNgsi2(deviceObj, previousDevice, entityInfoUpdated,
if ('transport' in newDevice && newDevice.transport !== undefined) {
oldDevice.transport = newDevice.transport;
}
if ('useCBflowControl' in newDevice && newDevice.useCBflowControl !== undefined) {
oldDevice.useCBflowControl = newDevice.useCBflowControl;
}
if ('storeLastMeasure' in newDevice && newDevice.storeLastMeasure !== undefined) {
oldDevice.storeLastMeasure = newDevice.storeLastMeasure;
}
Expand Down
1 change: 1 addition & 0 deletions lib/services/groups/groupRegistryMongoDB.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const attributeList = [
'ngsiVersion',
'entityNameExp',
'payloadType',
'useCBflowControl',
'storeLastMeasure'
];

Expand Down
7 changes: 6 additions & 1 deletion lib/services/ngsi/entities-NGSI-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,10 @@ function sendUpdateValueNgsi2(entityName, originMeasures, originTypeInformation,
}
}
} // end for (let measures of originMeasures)

let url = '/v2/op/update';
if (originTypeInformation.useCBflowControl) {
url += '?options=flowControl';
}
let options = NGSIUtils.createRequestObject(url, originTypeInformation, token);
options.json = payload;

Expand All @@ -680,6 +682,9 @@ function sendUpdateValueNgsi2(entityName, originMeasures, originTypeInformation,
if (!multi) {
// recreate options object to use single entity update
url = '/v2/entities?options=upsert';
if (originTypeInformation.useCBflowControl) {
url += ',flowControl';
}
options = NGSIUtils.createRequestObject(url, originTypeInformation, token);
delete payload.actionType;

Expand Down
3 changes: 3 additions & 0 deletions lib/services/northBound/deviceProvisioningServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const provisioningAPITranslation = {
ngsiVersion: 'ngsiVersion',
entityNameExp: 'entityNameExp',
payloadType: 'payloadType',
useCBflowControl: 'useCBflowControl',
storeLastMeasure: 'storeLastMeasure',
lastMeasure: 'lastMeasure'
};
Expand Down Expand Up @@ -146,6 +147,7 @@ function handleProvision(req, res, next) {
explicitAttrs: body.explicitAttrs,
ngsiVersion: body.ngsiVersion,
payloadType: body.payloadType,
useCBflowControl: body.useCBflowControl,
storeLastMeasure: body.storeLastMeasure,
lastMeasure: body.lastMeasure
});
Expand Down Expand Up @@ -225,6 +227,7 @@ function toProvisioningAPIFormat(device) {
explicitAttrs: device.explicitAttrs,
ngsiVersion: device.ngsiVersion,
payloadType: device.payloadType,
useCBflowControl: device.useCBflowControl,
storeLastMeasure: device.storeLastMeasure,
lastMeasure: device.lastMeasure
};
Expand Down
4 changes: 4 additions & 0 deletions lib/templates/updateDevice.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@
"description": "Payload type",
"type": "string"
},
"useCBflowControl": {
"description": "use CB flowControl option",
"type": "boolean"
},
"contentType": {
"description": "Content type",
"type": "string"
Expand Down
4 changes: 4 additions & 0 deletions lib/templates/updateDeviceLax.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@
"description": "Payload type allowed for measures for this device",
"type": "string"
},
"useCBflowControl": {
"description": "use CB flowControl option",
"type": "boolean"
}
"storeLastMeasure": {
"description": "Store last measure",
"type": "boolean"
Expand Down
5 changes: 3 additions & 2 deletions test/functional/config-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ config.amqp = {
};

config.iota = {
logLevel: 'FATAL',
logLevel: 'DEBUG',
contextBroker: {
host: '192.168.1.1',
port: '1026',
Expand All @@ -61,7 +61,8 @@ config.iota = {
service: 'smartgondor',
subservice: '/gardens',
providerUrl: 'http://localhost:4041',
types: {}
types: {},
useCBflowControl: true
};

config.defaultKey = '1234';
Expand Down
4 changes: 2 additions & 2 deletions test/functional/testUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ async function testCase(measure, expectation, provision, env, config, type, tran
let cbMockRoute = '';
// Set the correct route depending if the test is multientity or not
if (type === 'multientity' || type === 'multimeasure') {
cbMockRoute = '/v2/op/update';
cbMockRoute = '/v2/op/update?options=flowControl';
} else {
cbMockRoute = '/v2/entities?options=upsert';
cbMockRoute = '/v2/entities?options=upsert,flowControl';
}

// Set the correct mock times depending if the test is multimeasure or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"entity_type": "SensorMachine",
"trust": "8970A9078A803H3BL98PINEQRW8342HBAMS",
"cbHost": "http://unexistentHost:1026",
"useCBflowControl": true,
"commands": [
{
"name": "wheel1",
Expand Down
9 changes: 5 additions & 4 deletions test/unit/general/contextBrokerKeystoneSecurityAccess-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ const iotAgentConfig = {
service: 'smartgondor',
subservice: 'gardens',
providerUrl: 'http://smartgondor.com',
deviceRegistrationDuration: 'P1M'
deviceRegistrationDuration: 'P1M',
useCBflowControl: true
};

describe('NGSI-v2 - Secured access to the Context Broker with Keystone', function () {
Expand Down Expand Up @@ -128,7 +129,7 @@ describe('NGSI-v2 - Secured access to the Context Broker with Keystone', functio
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', 'electricity')
.matchHeader('X-Auth-Token', '12345679ABCDEF')
.post('/v2/entities?options=upsert')
.post('/v2/entities?options=upsert,flowControl')
.reply(204);
iotAgentLib.activate(iotAgentConfig, done);
});
Expand Down Expand Up @@ -165,7 +166,7 @@ describe('NGSI-v2 - Secured access to the Context Broker with Keystone', functio
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', 'electricity')
.matchHeader('X-Auth-Token', '12345679ABCDEF')
.post('/v2/entities?options=upsert')
.post('/v2/entities?options=upsert,flowControl')
.reply(403, { name: 'ACCESS_FORBIDDEN' });

iotAgentLib.activate(iotAgentConfig, done);
Expand Down Expand Up @@ -197,7 +198,7 @@ describe('NGSI-v2 - Secured access to the Context Broker with Keystone', functio
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', 'electricity')
.matchHeader('X-Auth-Token', '12345679ABCDEF')
.post('/v2/entities?options=upsert');
.post('/v2/entities?options=upsert,flowControl');

iotAgentLib.activate(iotAgentConfig, done);
});
Expand Down
11 changes: 6 additions & 5 deletions test/unit/general/deviceService-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ const iotAgentConfig = {
service: 'smartgondor',
subservice: 'gardens',
providerUrl: 'http://smartgondor.com',
deviceRegistrationDuration: 'P1M'
deviceRegistrationDuration: 'P1M',
useCBflowControl: true
};
const groupCreation = {
url: 'http://localhost:' + iotAgentConfig.server.port + '/iot/services',
Expand Down Expand Up @@ -222,7 +223,7 @@ describe('NGSI-v2 - Device Service: utils', function () {
contextBrokerMock
.matchHeader('fiware-service', 'testservice')
.matchHeader('fiware-servicepath', '/testingPath')
.post('/v2/entities?options=upsert')
.post('/v2/entities?options=upsert,flowControl')
.reply(204);

async.series(
Expand Down Expand Up @@ -255,7 +256,7 @@ describe('NGSI-v2 - Device Service: utils', function () {
contextBrokerMock
.matchHeader('fiware-service', 'testservice')
.matchHeader('fiware-servicepath', '/testingPath')
.post('/v2/entities?options=upsert')
.post('/v2/entities?options=upsert,flowControl')
.reply(204);

async.series(
Expand Down Expand Up @@ -290,7 +291,7 @@ describe('NGSI-v2 - Device Service: utils', function () {
contextBrokerMock
.matchHeader('fiware-service', 'testservice')
.matchHeader('fiware-servicepath', '/testingPath')
.post('/v2/entities?options=upsert')
.post('/v2/entities?options=upsert,flowControl')
.reply(204);

async.series([request.bind(request, groupCreation)], function (error, results) {
Expand Down Expand Up @@ -326,7 +327,7 @@ describe('NGSI-v2 - Device Service: utils', function () {
contextBrokerMock
.matchHeader('fiware-service', 'testservice')
.matchHeader('fiware-servicepath', '/testingPath')
.post('/v2/entities?options=upsert')
.post('/v2/entities?options=upsert,flowControl')
.reply(204);

async.series([request.bind(request, configGroupCreation)], function (error, results) {
Expand Down
Loading

0 comments on commit b852a43

Please sign in to comment.