From 809ac25a2903ebbc9304e75f35d766ec7cba6a13 Mon Sep 17 00:00:00 2001 From: AnthonyTsu1984 <115786031+AnthonyTsu1984@users.noreply.github.com> Date: Sun, 24 Nov 2024 10:25:04 +0800 Subject: [PATCH] Update prepare-source-data.md --- .../data-import/prepare-source-data.md | 451 +++++++++++++----- 1 file changed, 336 insertions(+), 115 deletions(-) diff --git a/site/en/userGuide/data-import/prepare-source-data.md b/site/en/userGuide/data-import/prepare-source-data.md index e1a87c146..b8342eb05 100644 --- a/site/en/userGuide/data-import/prepare-source-data.md +++ b/site/en/userGuide/data-import/prepare-source-data.md @@ -57,51 +57,145 @@ schema = MilvusClient.create_schema( enable_dynamic_field=True ) -schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True) -schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768) -schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512) -schema.add_field(field_name="scalar_2", datatype=DataType.INT64) +DIM = 512 + +schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True), +schema.add_field(field_name="bool", datatype=DataType.BOOL), +schema.add_field(field_name="int8", datatype=DataType.INT8), +schema.add_field(field_name="int16", datatype=DataType.INT16), +schema.add_field(field_name="int32", datatype=DataType.INT32), +schema.add_field(field_name="int64", datatype=DataType.INT64), +schema.add_field(field_name="float", datatype=DataType.FLOAT), +schema.add_field(field_name="double", datatype=DataType.DOUBLE), +schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512), +schema.add_field(field_name="json", datatype=DataType.JSON), +schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128) +schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64) +schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM), +schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM), +schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM), +# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM), +schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR) schema.verify() + +print(schema) ``` ```java -import io.milvus.grpc.DataType; -import io.milvus.param.collection.CollectionSchemaParam; -import io.milvus.param.collection.FieldType; - -// Define schema for the target collection -FieldType id = FieldType.newBuilder() - .withName("id") - .withDataType(DataType.Int64) - .withPrimaryKey(true) - .withAutoID(false) - .build(); - -FieldType vector = FieldType.newBuilder() - .withName("vector") - .withDataType(DataType.FloatVector) - .withDimension(768) - .build(); - -FieldType scalar1 = FieldType.newBuilder() - .withName("scalar_1") - .withDataType(DataType.VarChar) - .withMaxLength(512) - .build(); - -FieldType scalar2 = FieldType.newBuilder() - .withName("scalar_2") - .withDataType(DataType.Int64) - .build(); - -CollectionSchemaParam schema = CollectionSchemaParam.newBuilder() - .withEnableDynamicField(true) - .addFieldType(id) - .addFieldType(vector) - .addFieldType(scalar1) - .addFieldType(scalar2) +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.milvus.bulkwriter.BulkImport; +import io.milvus.bulkwriter.RemoteBulkWriter; +import io.milvus.bulkwriter.RemoteBulkWriterParam; +import io.milvus.bulkwriter.common.clientenum.BulkFileType; +import io.milvus.bulkwriter.common.clientenum.CloudStorage; +import io.milvus.bulkwriter.connect.S3ConnectParam; +import io.milvus.bulkwriter.connect.StorageConnectParam; +import io.milvus.bulkwriter.request.describe.MilvusDescribeImportRequest; +import io.milvus.bulkwriter.request.import_.MilvusImportRequest; +import io.milvus.bulkwriter.request.list.MilvusListImportJobsRequest; +import io.milvus.common.utils.Float16Utils; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeUnit; + +private static final String MINIO_ENDPOINT = CloudStorage.MINIO.getEndpoint("http://127.0.0.1:9000"); +private static final String BUCKET_NAME = "a-bucket"; +private static final String ACCESS_KEY = "minioadmin"; +private static final String SECRET_KEY = "minioadmin"; + +private static final Integer DIM = 512; +private static final Gson GSON_INSTANCE = new Gson(); + +private static CreateCollectionReq.CollectionSchema createSchema() { + CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder() + .enableDynamicField(true) .build(); + schema.addField(AddFieldReq.builder() + .fieldName("id") + .dataType(io.milvus.v2.common.DataType.Int64) + .isPrimaryKey(Boolean.TRUE) + .autoID(false) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("bool") + .dataType(DataType.Bool) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("int8") + .dataType(DataType.Int8) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("int16") + .dataType(DataType.Int16) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("int32") + .dataType(DataType.Int32) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("int64") + .dataType(DataType.Int64) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("float") + .dataType(DataType.Float) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("double") + .dataType(DataType.Double) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("varchar") + .dataType(DataType.VarChar) + .maxLength(512) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("json") + .dataType(io.milvus.v2.common.DataType.JSON) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("array_int") + .dataType(io.milvus.v2.common.DataType.Array) + .maxCapacity(100) + .elementType(io.milvus.v2.common.DataType.Int64) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("array_str") + .dataType(io.milvus.v2.common.DataType.Array) + .maxCapacity(100) + .elementType(io.milvus.v2.common.DataType.VarChar) + .maxLength(128) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("float_vector") + .dataType(io.milvus.v2.common.DataType.FloatVector) + .dimension(DIM) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("binary_vector") + .dataType(io.milvus.v2.common.DataType.BinaryVector) + .dimension(DIM) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("float16_vector") + .dataType(io.milvus.v2.common.DataType.Float16Vector) + .dimension(DIM) + .build()); + schema.addField(AddFieldReq.builder() + .fieldName("sparse_vector") + .dataType(io.milvus.v2.common.DataType.SparseFloatVector) + .build()); + + return schema; +} ``` ## Set up BulkWriter @@ -194,7 +288,7 @@ from pymilvus.bulk_writer import RemoteBulkWriter # Third-party constants ACCESS_KEY="minioadmin" SECRET_KEY="minioadmin" -BUCKET_NAME="milvus-bucket" +BUCKET_NAME="a-bucket" # Connections parameters to access the remote bucket conn = RemoteBulkWriter.S3ConnectParam( @@ -204,23 +298,37 @@ conn = RemoteBulkWriter.S3ConnectParam( bucket_name=BUCKET_NAME, secure=False ) -``` -```java -import io.milvus.bulkwriter.common.clientenum.BulkFileType; -import io.milvus.bulkwriter.connect.S3ConnectParam; -import io.milvus.bulkwriter.connect.StorageConnectParam; +from pymilvus.bulk_writer import BulkFileType +# Use `from pymilvus import BulkFileType` +# when you use pymilvus earlier than 2.4.2 -String ACCESS_KEY = "minioadmin"; -String SECRET_KEY = "minioadmin"; -String BUCKET_NAME = "milvus-bucket"; +writer = RemoteBulkWriter( + schema=schema, + remote_path="/", + connect_param=conn, + file_type=BulkFileType.PARQUET +) -StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder() - .withEndpoint(MINIO_URI) - .withAccessKey(ACCESS_KEY) - .withSecretKey(SECRET_KEY) - .withBucketName(BUCKET_NAME) - .build(); +print('bulk writer created.') +``` + +```java +private static RemoteBulkWriter createRemoteBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema) throws IOException { + StorageConnectParam connectParam = S3ConnectParam.newBuilder() + .withEndpoint(MINIO_ENDPOINT) + .withBucketName(BUCKET_NAME) + .withAccessKey(ACCESS_KEY) + .withSecretKey(SECRET_KEY) + .build(); + RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder() + .withCollectionSchema(collectionSchema) + .withRemotePath("/") + .withConnectParam(connectParam) + .withFileType(BulkFileType.PARQUET) + .build(); + return new RemoteBulkWriter(bulkWriterParam); +} ``` Once the connection parameters are ready, you can reference it in the **RemoteBulkWriter** as follows: @@ -292,8 +400,9 @@ For demonstration purposes, the following code appends randomly generated data. ```python -import random -import string +import random, string, json +import numpy as np +import tensorflow as tf def generate_random_str(length=5): letters = string.ascii_uppercase @@ -301,81 +410,193 @@ def generate_random_str(length=5): return ''.join(random.choices(letters + digits, k=length)) +# optional input for binary vector: +# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0] +# 2. numpy array of uint8 +def gen_binary_vector(to_numpy_arr): + raw_vector = [random.randint(0, 1) for i in range(DIM)] + if to_numpy_arr: + return np.packbits(raw_vector, axis=-1) + return raw_vector + +# optional input for float vector: +# 1. list of float such as [0.56, 1.859, 6.55, 9.45] +# 2. numpy array of float32 +def gen_float_vector(to_numpy_arr): + raw_vector = [random.random() for _ in range(DIM)] + if to_numpy_arr: + return np.array(raw_vector, dtype="float32") + return raw_vector + +# # optional input for bfloat16 vector: +# # 1. list of float such as [0.56, 1.859, 6.55, 9.45] +# # 2. numpy array of bfloat16 +# def gen_bf16_vector(to_numpy_arr): +# raw_vector = [random.random() for _ in range(DIM)] +# if to_numpy_arr: +# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy() +# return raw_vector + +# optional input for float16 vector: +# 1. list of float such as [0.56, 1.859, 6.55, 9.45] +# 2. numpy array of float16 +def gen_fp16_vector(to_numpy_arr): + raw_vector = [random.random() for _ in range(DIM)] + if to_numpy_arr: + return np.array(raw_vector, dtype=np.float16) + return raw_vector + +# optional input for sparse vector: +# only accepts dict like {2: 13.23, 45: 0.54} or {"indices": [1, 2], "values": [0.1, 0.2]} +# note: no need to sort the keys +def gen_sparse_vector(pair_dict: bool): + raw_vector = {} + dim = random.randint(2, 20) + if pair_dict: + raw_vector["indices"] = [i for i in range(dim)] + raw_vector["values"] = [random.random() for _ in range(dim)] + else: + for i in range(dim): + raw_vector[i] = random.random() + return raw_vector + for i in range(10000): writer.append_row({ - "id": i, - "vector": [random.uniform(-1, 1) for _ in range(768)], - "scalar_1": generate_random_str(random.randint(1, 20)), - "scalar_2": random.randint(0, 100) + "id": np.int64(i), + "bool": True if i % 3 == 0 else False, + "int8": np.int8(i%128), + "int16": np.int16(i%1000), + "int32": np.int32(i%100000), + "int64": np.int64(i), + "float": np.float32(i/3), + "double": np.float64(i/7), + "varchar": f"varchar_{i}", + "json": json.dumps({"dummy": i, "ok": f"name_{i}"}), + "array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")), + "array_int": np.array([k for k in range(10)], np.dtype("int64")), + "float_vector": gen_float_vector(True), + "binary_vector": gen_binary_vector(True), + "float16_vector": gen_fp16_vector(True), + # "bfloat16_vector": gen_bf16_vector(True), + "sparse_vector": gen_sparse_vector(True), + f"dynamic_{i}": i, }) - -writer.commit() + if (i+1)%1000 == 0: + writer.commit() + print('committed') + +print(writer.batch_files) ``` ```java -import com.alibaba.fastjson.JSONObject; - -for (int i = 0; i < 10000; i++) { - JSONObject json = new JSONObject(); - json.put("id", i); - json.put("vector", get_random_vector(768)); - json.put("scalar_1", get_random_string(20)); - json.put("scalar_2", (long) (Math.random() * 100)); - - // localBulkWriter.appendRow(json); - remoteBulkWriter.appendRow(json); +private static byte[] genBinaryVector() { + Random ran = new Random(); + int byteCount = DIM / 8; + ByteBuffer vector = ByteBuffer.allocate(byteCount); + for (int i = 0; i < byteCount; ++i) { + vector.put((byte) ran.nextInt(Byte.MAX_VALUE)); + } + return vector.array(); } -// localBulkWriter.commit(false); -remoteBulkWriter.commit(false); -``` +private static List genFloatVector() { + Random ran = new Random(); + List vector = new ArrayList<>(); + for (int i = 0; i < DIM; ++i) { + vector.add(ran.nextFloat()); + } + return vector; +} -Since the schema defined permits dynamic fields, you can also include non-schema-defined fields in the data to insert as follows. +private static byte[] genFloat16Vector() { + List originalVector = genFloatVector(); + return Float16Utils.f32VectorToFp16Buffer(originalVector).array(); +} - +private static SortedMap genSparseVector() { + Random ran = new Random(); + SortedMap sparse = new TreeMap<>(); + int dim = ran.nextInt(18) + 2; // [2, 20) + for (int i = 0; i < dim; ++i) { + sparse.put((long)ran.nextInt(1000000), ran.nextFloat()); + } + return sparse; +} -```python -import random -import string +private static List genStringArray(int length) { + List arr = new ArrayList<>(); + for (int i = 0; i < length; i++) { + arr.add("str_" + i); + } + return arr; +} -def generate_random_string(length=5): - letters = string.ascii_uppercase - digits = string.digits - - return ''.join(random.choices(letters + digits, k=length)) +private static List genIntArray(int length) { + List arr = new ArrayList<>(); + for (long i = 0; i < length; i++) { + arr.add(i); + } + return arr; +} -for i in range(10000): - writer.append_row({ - "id": i, - "vector":[random.uniform(-1, 1) for _ in range(768)], - "scalar_1": generate_random_string(), - "scalar_2": random.randint(0, 100), - "dynamic_field_1": random.choice([True, False]), - "dynamic_field_2": random.randint(0, 100) - }) - -writer.commit() -``` +private static RemoteBulkWriter createRemoteBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema) throws IOException { + StorageConnectParam connectParam = S3ConnectParam.newBuilder() + .withEndpoint(MINIO_ENDPOINT) + .withBucketName(BUCKET_NAME) + .withAccessKey(ACCESS_KEY) + .withSecretKey(SECRET_KEY) + .build(); + RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder() + .withCollectionSchema(collectionSchema) + .withRemotePath("/") + .withConnectParam(connectParam) + .withFileType(BulkFileType.PARQUET) + .build(); + return new RemoteBulkWriter(bulkWriterParam); +} -```java -for (int i = 0; i < 10000; i++) { - JSONObject json = new JSONObject(); - json.put("id", i); - json.put("vector", get_random_vector(768)); - json.put("scalar_1", get_random_string(20)); - json.put("scalar_2", (long) (Math.random() * 100)); - json.put("dynamic_field_1", get_random_boolean()); - json.put("dynamic_field_2", (long) (Math.random() * 100)); - - // localBulkWriter.appendRow(json); - remoteBulkWriter.appendRow(json); +private static List> uploadData() throws Exception { + CreateCollectionReq.CollectionSchema collectionSchema = createSchema(); + try (RemoteBulkWriter remoteBulkWriter = createRemoteBulkWriter(collectionSchema)) { + for (int i = 0; i < 10000; ++i) { + JsonObject rowObject = new JsonObject(); + + rowObject.addProperty("id", i); + rowObject.addProperty("bool", i % 3 == 0); + rowObject.addProperty("int8", i % 128); + rowObject.addProperty("int16", i % 1000); + rowObject.addProperty("int32", i % 100000); + rowObject.addProperty("int64", i); + rowObject.addProperty("float", i / 3); + rowObject.addProperty("double", i / 7); + rowObject.addProperty("varchar", "varchar_" + i); + rowObject.addProperty("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i)); + rowObject.add("array_str", GSON_INSTANCE.toJsonTree(genStringArray(5))); + rowObject.add("array_int", GSON_INSTANCE.toJsonTree(genIntArray(10))); + rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(genFloatVector())); + rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(genBinaryVector())); + rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(genFloat16Vector())); + rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(genSparseVector())); + rowObject.addProperty("dynamic", "dynamic_" + i); + + remoteBulkWriter.appendRow(rowObject); + + if ((i+1)%1000 == 0) { + remoteBulkWriter.commit(false); + } + } + + List> batchFiles = remoteBulkWriter.getBatchFiles(); + System.out.println(batchFiles); + return batchFiles; + } catch (Exception e) { + throw e; + } } -// localBulkWriter.commit(false); -remoteBulkWriter.commit(false); +public static void main(String[] args) throws Exception { + List> batchFiles = uploadData(); +} ``` ## Verify the results