-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathCursor.js
307 lines (282 loc) · 9.32 KB
/
Cursor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const Errors = require('./Errors');
const BinaryUtils = require('./internal/BinaryUtils');
const BinaryObject = require('./BinaryObject');
/**
* Class representing a cursor to obtain results of SQL and Scan query operations.
*
* The class has no public constructor. An instance of this class is obtained
* via query() method of {@link CacheClient} objects.
* One instance of this class returns results of one SQL or Scan query operation.
*
* @hideconstructor
*/
class Cursor {
/**
* Returns one element (cache entry - key-value pair) from the query results.
*
* Every new call returns the next cache entry from the query results.
* If the method returns null, no more entries are available.
*
* @async
*
* @return {Promise<CacheEntry>} - a cache entry (key-value pair).
*/
async getValue() {
if (!this._values || this._valueIndex >= this._values.length) {
await this._getValues();
this._valueIndex = 0;
}
if (this._values && this._values.length > 0) {
const value = this._values[this._valueIndex];
this._valueIndex++;
return value;
}
return null;
}
/**
* Checks if more elements are available in the query results.
*
* @return {boolean} - true if more cache entries are available, false otherwise.
*/
hasMore() {
return this._hasNext ||
this._values && this._valueIndex < this._values.length;
}
/**
* Returns all elements (cache entries - key-value pairs) from the query results.
*
* May be used instead of getValue() method if the number of returned entries
* is relatively small and will not cause memory utilization issues.
*
* @async
*
* @return {Promise<Array<CacheEntry>>} - all cache entries (key-value pairs)
* returned by SQL or Scan query.
*/
async getAll() {
let result = new Array();
let values;
do {
values = await this._getValues();
if (values) {
result = result.concat(values);
}
} while (this._hasNext);
return result;
}
/**
* Closes the cursor. Obtaining elements from the results is not possible after this.
*
* This method should be called if no more elements are needed.
* It is not neccessary to call it if all elements have been already obtained.
*
* @async
*/
async close() {
// Close cursor only if the server has more pages: the server closes cursor automatically on last page
if (this._id && this._hasNext) {
await this._communicator.send(
BinaryUtils.OPERATION.RESOURCE_CLOSE,
async (payload) => {
await this._write(payload);
});
}
}
/** Private methods */
/**
* @ignore
*/
constructor(communicator, operation, buffer, keyType = null, valueType = null) {
this._communicator = communicator;
this._operation = operation;
this._buffer = buffer;
this._keyType = keyType;
this._valueType = valueType;
this._id = null;
this._hasNext = false;
this._values = null;
this._valueIndex = 0;
}
/**
* @ignore
*/
async _getNext() {
this._hasNext = false;
this._values = null;
this._buffer = null;
await this._communicator.send(
this._operation,
async (payload) => {
await this._write(payload);
},
async (payload) => {
this._buffer = payload;
});
}
/**
* @ignore
*/
async _getValues() {
if (!this._buffer && this._hasNext) {
await this._getNext();
}
await this._read(this._buffer)
this._buffer = null;
return this._values;
}
/**
* @ignore
*/
async _write(buffer) {
buffer.writeLong(this._id);
}
/**
* @ignore
*/
_readId(buffer) {
this._id = buffer.readLong();
}
/**
* @ignore
*/
async _readRow(buffer) {
const CacheEntry = require('./CacheClient').CacheEntry;
return new CacheEntry(
await this._communicator.readObject(buffer, this._keyType),
await this._communicator.readObject(buffer, this._valueType));
}
/**
* @ignore
*/
async _read(buffer) {
const rowCount = buffer.readInteger();
this._values = new Array(rowCount);
for (let i = 0; i < rowCount; i++) {
this._values[i] = await this._readRow(buffer);
}
this._hasNext = buffer.readBoolean();
}
}
/**
* Class representing a cursor to obtain results of SQL Fields query operation.
*
* The class has no public constructor. An instance of this class is obtained
* via query() method of {@link CacheClient} objects.
* One instance of this class returns results of one SQL Fields query operation.
*
* @hideconstructor
* @extends Cursor
*/
class SqlFieldsCursor extends Cursor {
/**
* Returns one element (array with values of the fields) from the query results.
*
* Every new call returns the next element from the query results.
* If the method returns null, no more elements are available.
*
* @async
*
* @return {Promise<Array<*>>} - array with values of the fields requested by the query.
*
*/
async getValue() {
return await super.getValue();
}
/**
* Returns all elements (arrays with values of the fields) from the query results.
*
* May be used instead of getValue() method if the number of returned elements
* is relatively small and will not cause memory utilization issues.
*
* @async
*
* @return {Promise<Array<Array<*>>>} - all results returned by SQL Fields query.
* Every element of the array is an array with values of the fields requested by the query.
*
*/
async getAll() {
return await super.getAll();
}
/**
* Returns names of the fields which were requested in the SQL Fields query.
*
* Empty array is returned if "include field names" flag was false in the query.
*
* @return {Array<string>} - field names.
* The order of names corresponds to the order of field values returned in the results of the query.
*/
getFieldNames() {
return this._fieldNames;
}
/**
* Specifies types of the fields returned by the SQL Fields query.
*
* By default, a type of every field is not specified that means during operations the Ignite client
* will try to make automatic mapping between JavaScript types and Ignite object types -
* according to the mapping table defined in the description of the {@link ObjectType} class.
*
* @param {...ObjectType.PRIMITIVE_TYPE | CompositeType} fieldTypes - types of the returned fields.
* The order of types must correspond the order of field values returned in the results of the query.
* A type of every field can be:
* - either a type code of primitive (simple) type
* - or an instance of class representing non-primitive (composite) type
* - or null (means the type is not specified)
*
* @return {SqlFieldsCursor} - the same instance of the SqlFieldsCursor.
*/
setFieldTypes(...fieldTypes) {
this._fieldTypes = fieldTypes;
return this;
}
/** Private methods */
/**
* @ignore
*/
constructor(communicator, buffer) {
super(communicator, BinaryUtils.OPERATION.QUERY_SQL_FIELDS_CURSOR_GET_PAGE, buffer);
this._fieldNames = [];
}
/**
* @ignore
*/
async _readFieldNames(buffer, includeFieldNames) {
this._id = buffer.readLong();
this._fieldCount = buffer.readInteger();
if (includeFieldNames) {
for (let i = 0; i < this._fieldCount; i++) {
this._fieldNames[i] = await this._communicator.readObject(buffer);
}
}
}
/**
* @ignore
*/
async _readRow(buffer) {
let values = new Array(this._fieldCount);
let fieldType;
for (let i = 0; i < this._fieldCount; i++) {
fieldType = this._fieldTypes && i < this._fieldTypes.length ? this._fieldTypes[i] : null;
values[i] = await this._communicator.readObject(buffer, fieldType);
}
return values;
}
}
module.exports.Cursor = Cursor;
module.exports.SqlFieldsCursor = SqlFieldsCursor;