批量导入批量导入是一种高效地添加多个数据对象和交叉引用的方法。
其他信息
要创建批量导入作业,请按照以下步骤操作
- 初始化一个批量对象。
- 将项目添加到批量对象。
- 确保发送(刷新)最后一个批量。
基本导入
以下示例将对象添加到MyCollection集合。
data_rows = [
{"title": f"Object {i+1}"} for i in range(5)
]
collection = client.collections.use("MyCollection")
with collection.batch.fixed_size(batch_size=200) as batch:
for data_row in data_rows:
batch.add_object(
properties=data_row,
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
错误处理
在批量导入期间,任何失败的对象或引用都将被存储,并且可以通过batch.failed_objects和batch.failed_references获得。 此外,还会维护失败对象和引用的运行计数,并且可以通过在上下文管理器中访问batch.number_errors来访问。 可以使用此计数器停止导入过程,以便调查失败的对象或引用。
在 Python 客户端参考页面上了解更多关于错误处理的信息。
let dataObjects = [
{ title: 'Object 1' },
{ title: 'Object 2' },
{ title: 'Object 3' }
]
const myCollection = client.collections.use('MyCollection')
const response = await myCollection.data.insertMany(dataObjects);
console.log(response);
className := "MyCollection"
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i),
})
}
batcher := client.Batch().ObjectsBatcher()
for _, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
})
}
batcher.Do(ctx)
List<Map<String, Object>> dataRows = new ArrayList<>();
for (int i = 0; i < 5; i++) {
dataRows.add(Map.of("title", "Object " + (i + 1)));
}
var collection = client.collections.use("MyCollection");
var response = collection.data.insertMany(dataRows.toArray(new Map[0]));
if (!response.errors().isEmpty()) {
System.err
.println("Number of failed imports: " + response.errors().size());
System.err.println("First failed object: " + response.errors().get(0));
}
String className = "MyCollection";
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i));
dataObjs.add(properties);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (Map<String, Object> properties : dataObjs) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(properties)
.build()
);
}
batcher.run();
var dataRows = Enumerable
.Range(0, 5)
.Select(i => new { title = $"Object {i + 1}" })
.ToList();
var collection = client.Collections.Use("MyCollection");
var response = await collection.Data.InsertMany(dataRows);
var failedObjects = response.Where(r => r.Error != null).ToList();
if (failedObjects.Any())
{
Console.WriteLine($"Number of failed imports: {failedObjects.Count}");
Console.WriteLine($"First failed object: {failedObjects.First().Error}");
}
服务器端批量
服务器端批处理是在 v1.34 中作为预览版添加的。
这意味着该功能仍在开发中,未来版本可能会发生变化,包括潜在的破坏性更改。目前,我们不建议在生产环境中使用此功能。
以下是如何使用服务器端批量导入将对象导入到名为MyCollection的集合中。 客户端将根据服务器的反馈以批量大小发送数据。
data_rows = [
{"title": f"Object {i+1}"} for i in range(5)
]
collection = client.collections.get("MyCollection")
with collection.batch.experimental() as batch:
for data_row in data_rows:
batch.add_object(
properties=data_row,
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
使用 gRPC API
gRPC API比 REST API 更快。 使用 gRPC API 以提高导入速度。
Python 客户端默认使用 gRPC。
旧版 Python 客户端不支持 gRPC。
TypeScript 客户端 v3 默认使用 gRPC。
旧版 TypeScript 客户端不支持 gRPC。
要在 Java 客户端中使用 gRPC API,请将 setGRPCHost 字段添加到客户端连接代码中。 如果使用加密连接,请更新 setGRPCSecured。
Config config = new Config("http", "localhost:8080");
config.setGRPCSecured(false);
config.setGRPCHost("localhost:50051");
要在 Go 客户端中使用 gRPC API,请将 GrpcConfig 字段添加到客户端连接代码中。 如果使用加密连接,请更新 Secured。
cfg := weaviate.Config{
Host: fmt.Sprintf("localhost:%v", "8080"),
Scheme: "http",
GrpcConfig: &grpc.Config{
Host: "localhost:50051",
Secured: false,
},
}
client, err := weaviate.NewClient(cfg)
if err != nil {
require.Nil(t, err)
}
要在 Spark 连接器中使用 gRPC API,请将 grpc:host 字段添加到客户端连接代码中。 如果使用加密连接,请更新 grpc:secured。
df.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", "localhost:8080")
.option("grpc:host", "localhost:50051")
.option("grpc:secured", "false")
.option("className", className)
.mode("append")
.save()
指定 ID 值
Weaviate 为每个对象生成 UUID。 对象 ID 必须是唯一的。 如果您设置对象 ID,请使用以下确定性 UUID 方法之一以防止重复 ID
generate_uuid5 (Python)
generateUuid5 (TypeScript)
from weaviate.util import generate_uuid5
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
collection = client.collections.use("MyCollection")
with collection.batch.fixed_size(batch_size=200) as batch:
for data_row in data_rows:
obj_uuid = generate_uuid5(data_row)
batch.add_object(
properties=data_row,
uuid=obj_uuid
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
import { generateUuid5 } from 'weaviate-client';
let dataObjects = [
{
properties: { title: 'Object 1' },
id: generateUuid5('MyCollection', 'Object 1'),
},
{
properties: { title: 'Object 2' },
id: generateUuid5('MyCollection', 'Object 2'),
},
]
const myCollection = client.collections.use('MyCollection')
await myCollection.data.insertMany(dataObject)
generateUUID := func(input string) strfmt.UUID {
input = strings.ToLower(input)
hash := md5.Sum([]byte(input))
uuid := fmt.Sprintf("%x-%x-%x-%x-%x", hash[0:4], hash[4:6], hash[6:8], hash[8:10], hash[10:])
return strfmt.UUID(uuid)
}
className := "MyCollection"
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i),
})
}
batcher := client.Batch().ObjectsBatcher()
for _, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
ID: generateUUID((dataObj.(map[string]interface{}))["title"].(string)),
})
}
batcher.Do(ctx)
List<WeaviateObject<Map<String, Object>>> dataObjects = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> dataRow = Map.of("title", "Object " + (i + 1));
UUID objUuid = generateUuid5(dataRow.toString());
dataObjects.add(WeaviateObject.<Map<String, Object>>of(
obj -> obj.properties(dataRow).uuid(objUuid.toString())));
}
var collection = client.collections.use("MyCollection");
var response = collection.data.insertMany(dataObjects);
if (!response.errors().isEmpty()) {
System.err
.println("Number of failed imports: " + response.errors().size());
System.err.println("First failed object: " + response.errors().get(0));
}
String className = "MyCollection";
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i));
dataObjs.add(properties);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (Map<String, Object> properties : dataObjs) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(properties)
.id(UUID.nameUUIDFromBytes(((String) properties.get("title")).getBytes()).toString())
.build()
);
}
batcher.run();
var dataToInsert = new List<BatchInsertRequest>();
var vectorData = Enumerable.Repeat(0.1f, 10).ToArray();
for (int i = 0; i < 5; i++)
{
var dataRow = new { title = $"Object {i + 1}" };
var objUuid = GenerateUuid5(JsonSerializer.Serialize(dataRow));
var vectors = new Vectors { { "default", vectorData } };
dataToInsert.Add(
BatchInsertRequest.Create(data: dataRow, id: objUuid, vectors: vectors)
);
}
var collection = client.Collections.Use("MyCollection");
var response = await collection.Data.InsertMany(dataToInsert);
var failedObjects = response.Where(r => r.Error != null).ToList();
if (failedObjects.Any())
{
Console.WriteLine($"Number of failed imports: {failedObjects.Count}");
Console.WriteLine($"First failed object: {failedObjects.First().Error}");
}
指定向量
使用 vector 属性为每个对象指定向量。
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
vectors = [[0.1] * 1536 for i in range(5)]
collection = client.collections.use("MyCollection")
with collection.batch.fixed_size(batch_size=200) as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector=vectors[i]
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
const myCollection = client.collections.use('MyCollection')
let dataObjects = [
{
properties: { title: 'Object 1' },
vectors: Array(100).fill(0.1111),
},
{
properties: { title: 'Object 2' },
vectors: Array(100).fill(0.2222),
},
]
await jeopardy.data.insertMany(dataObjects)
className := "MyCollection"
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i),
})
}
vectors := [][]float32{}
for i := 0; i < 5; i++ {
vector := make([]float32, 10)
for j := range vector {
vector[j] = 0.25 + float32(j/100)
}
vectors = append(vectors, vector)
}
batcher := client.Batch().ObjectsBatcher()
for i, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
Vector: vectors[i],
})
}
batcher.Do(ctx)
List<WeaviateObject<Map<String, Object>>> dataObjects = new ArrayList<>();
float[] vector = new float[10];
Arrays.fill(vector, 0.1f);
for (int i = 0; i < 5; i++) {
Map<String, Object> dataRow = Map.of("title", "Object " + (i + 1));
UUID objUuid = generateUuid5(dataRow.toString());
dataObjects.add(
WeaviateObject.<Map<String, Object>>of(obj -> obj.properties(dataRow)
.uuid(objUuid.toString())
.vectors(Vectors.of(vector))));
}
var collection = client.collections.use("MyCollection");
var response = collection.data.insertMany(dataObjects);
if (!response.errors().isEmpty()) {
System.err
.println("Number of failed imports: " + response.errors().size());
System.err.println("First failed object: " + response.errors().get(0));
}
String className = "MyCollection";
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i));
dataObjs.add(properties);
}
List<Float[]> vectors = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Float[] vector = new Float[10];
Arrays.fill(vector, 0.25f + i / 100f);
vectors.add(vector);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (int i = 0; i < 5; i++) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(dataObjs.get(i))
.vector(vectors.get(i))
.build()
);
}
batcher.run();
var dataToInsert = new List<BatchInsertRequest>();
var vectorData = Enumerable.Repeat(0.1f, 10).ToArray();
for (int i = 0; i < 5; i++)
{
var dataRow = new { title = $"Object {i + 1}" };
var objUuid = GenerateUuid5(JsonSerializer.Serialize(dataRow));
var vectors = new Vectors { { "default", vectorData } };
dataToInsert.Add(
BatchInsertRequest.Create(data: dataRow, id: objUuid, vectors: vectors)
);
}
var collection = client.Collections.Use("MyCollection");
var response = await collection.Data.InsertMany(dataToInsert);
if (response.HasErrors)
{
Console.WriteLine($"Number of failed imports: {response.Errors.Count()}");
Console.WriteLine($"First failed object: {response.Errors.First().Message}");
}
指定命名向量
创建对象时,可以指定命名向量(如果在您的集合中配置)。
data_rows = [{
"title": f"Object {i+1}",
"body": f"Body {i+1}"
} for i in range(5)]
title_vectors = [[0.12] * 1536 for _ in range(5)]
body_vectors = [[0.34] * 1536 for _ in range(5)]
collection = client.collections.use("MyCollection")
with collection.batch.fixed_size(batch_size=200) as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector={
"title": title_vectors[i],
"body": body_vectors[i],
}
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
const myCollection = client.collections.use("MyCollection")
let dataObjects = [
{
properties: { title: 'Object 1' },
vectors: {
title: Array(100).fill(0.1111),
body: Array(100).fill(0.9999),
}
},
{
properties: { title: 'Object 2' },
vectors: {
title: Array(100).fill(0.2222),
body: Array(100).fill(0.8888),
}
},
]
await myCollection.data.insertMany(dataObjects)
}
List<Map<String, Object>> dataRows = new ArrayList<>();
List<float[]> titleVectors = new ArrayList<>();
List<float[]> bodyVectors = new ArrayList<>();
for (int i = 0; i < 5; i++) {
dataRows
.add(Map.of("title", "Object " + (i + 1), "body", "Body " + (i + 1)));
float[] titleVector = new float[1536];
Arrays.fill(titleVector, 0.12f);
titleVectors.add(titleVector);
float[] bodyVector = new float[1536];
Arrays.fill(bodyVector, 0.34f);
bodyVectors.add(bodyVector);
}
CollectionHandle<Map<String, Object>> collection =
client.collections.use("MyCollection");
List<WeaviateObject<Map<String, Object>>> objectsToInsert =
new ArrayList<>();
for (int i = 0; i < dataRows.size(); i++) {
int index = i;
objectsToInsert.add(
WeaviateObject
.<Map<String, Object>>of(v -> v.properties(dataRows.get(index))
.vectors(Vectors.of("title", titleVectors.get(index)))
.vectors(Vectors.of("body", bodyVectors.get(index)))));
}
InsertManyResponse response = collection.data.insertMany(objectsToInsert);
if (!response.errors().isEmpty()) {
System.err.printf("Number of failed imports: %d\n",
response.errors().size());
System.err.printf("First failed object error: %s\n",
response.errors().get(0));
}
var dataToInsert = new List<BatchInsertRequest>();
for (int i = 0; i < 5; i++)
{
var dataRow = new { title = $"Object {i + 1}", body = $"Body {i + 1}" };
var titleVector = Enumerable.Repeat(0.12f, 1536).ToArray();
var bodyVector = Enumerable.Repeat(0.34f, 1536).ToArray();
var namedVectors = new Vectors { { "title", titleVector }, { "body", bodyVector } };
dataToInsert.Add(BatchInsertRequest.Create(dataRow, vectors: namedVectors));
}
var collection = client.Collections.Use("MyCollection");
var response = await collection.Data.InsertMany(dataToInsert);
if (response.HasErrors)
{
Console.WriteLine($"Number of failed imports: {response.Errors.Count()}");
Console.WriteLine($"First failed object error: {response.Errors.First().Message}");
}
使用引用导入
您可以批量创建从一个对象到另一个对象的链接,通过交叉引用。
collection = client.collections.use("Author")
with collection.batch.fixed_size(batch_size=100) as batch:
batch.add_reference(
from_property="writesFor",
from_uuid=from_uuid,
to=target_uuid,
)
failed_references = collection.batch.failed_references
if failed_references:
print(f"Number of failed imports: {len(failed_references)}")
print(f"First failed reference: {failed_references[0]}")
var collection = client.collections.use("Author");
var response = collection.data
.referenceAddMany(BatchReference.uuids(from, "writesFor", targetUuid));
if (!response.errors().isEmpty()) {
System.err
.println("Number of failed imports: " + response.errors().size());
System.err.println("First failed object: " + response.errors().get(0));
}
var collection = client.Collections.Use("Author");
var response = await collection.Data.ReferenceAddMany([
new DataReference(fromUuid, "writesFor", targetUuid),
]);
if (response.HasErrors)
{
Console.WriteLine($"Number of failed imports: {response.Errors.Count}");
Console.WriteLine($"First failed object: {response.Errors.First()}");
}
Python 特定的注意事项
Python 客户端具有内置的批量方法,可帮助您优化导入速度。 有关详细信息,请参阅客户端文档
异步 Python 客户端和批量
目前,异步 Python 客户端不支持批量。 要使用批量,请使用同步 Python 客户端。
从大文件流式传输数据
如果您的数据集很大,请考虑流式传输导入以避免内存不足的问题。
要尝试示例代码,请下载示例数据并创建示例输入文件。
获取示例数据
import requests
response = requests.get(
"https://raw.githubusercontent.com/weaviate-tutorials/intro-workshop/main/data/jeopardy_1k.json"
)
data = response.json()
with open('jeopardy_1k.json', 'w') as f:
json.dump(data, f)
import fs from 'fs';
// Uncomment these imports to save as csv
// import { mkConfig, generateCsv, asString } from "export-to-csv";
// import { writeFile } from "node:fs";
// import { Buffer } from "node:buffer";
// Get the data file
async function getJsonData() {
const file = await fetch(
'https://raw.githubusercontent.com/weaviate-tutorials/intro-workshop/main/data/jeopardy_1k.json'
);
const data = await file.json()
// Save as json
fs.writeFile("jeopardy_1k.json", JSON.stringify(data), function(err) {
if (err) {
console.log(err);
}
});
// // Uncomment this section to save as csv
// const csvConfig = mkConfig({ useKeysAsHeaders: true, filename: "jeopardy_1k" });
// const csv = generateCsv(csvConfig)(data);
// const filename = `${csvConfig.filename}.csv`;
// const csvBuffer = new Uint8Array(Buffer.from(asString(csv)));
// // Write the csv file to disk
// writeFile(filename, csvBuffer, (err) => {
// if (err) throw err;
// console.log("file saved: ", filename);
// });
}
await getJsonData();
流式传输 JSON 文件示例代码
import ijson
counter = 0
interval = 200
print("JSON streaming, to avoid running out of memory on large files...")
with client.batch.fixed_size(batch_size=100) as batch:
with open("jeopardy_1k.json", "rb") as f:
objects = ijson.items(f, "item")
for obj in objects:
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
)
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print(f"Finished importing {counter} articles.")
import weaviate from 'weaviate-client';
import fs from 'fs';
import parser from 'stream-json';
import StreamArray from 'stream-json/streamers/StreamArray';
import Chain from 'stream-chain';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
});
counter++;
if (counter % batchSize === 0) {
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
for (const r of response)
if (r.result.errors)
throw r.result.errors;
console.log(`Imported ${counter} articles...`);
}
}
async function importJson(filePath) {
const pipeline = new Chain([
fs.createReadStream(filePath),
parser(),
new StreamArray(),
]);
for await (const { value } of pipeline) {
await addObject(value);
}
}
await importJson('jeopardy_1k.json');
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
int batchSize = 100;
List<Map<String, Object>> batch = new ArrayList<>(batchSize);
var collection = client.collections.use("JeopardyQuestion");
Gson gson = new Gson();
System.out.println(
"JSON streaming, to avoid running out of memory on large files...");
try (JsonReader reader =
new JsonReader(new FileReader("jeopardy_1k.json"))) {
reader.beginArray();
while (reader.hasNext()) {
Map<String, String> obj = gson.fromJson(reader, Map.class);
Map<String, Object> properties = new HashMap<>();
properties.put("question", obj.get("Question"));
properties.put("answer", obj.get("Answer"));
batch.add(properties);
if (batch.size() == batchSize) {
collection.data.insertMany(batch.toArray(new Map[0]));
System.out.println("Imported " + batch.size() + " articles...");
batch.clear();
}
}
reader.endArray();
}
if (!batch.isEmpty()) {
collection.data.insertMany(batch.toArray(new Map[0]));
System.out.println("Imported remaining " + batch.size() + " articles...");
}
System.out.println("Finished importing articles.");
int batchSize = 100;
var batch = new List<object>(batchSize);
var collection = client.Collections.Use("JeopardyQuestion");
Console.WriteLine("JSON streaming, to avoid running out of memory on large files...");
using var fileStream = File.OpenRead(JsonDataFile);
var jsonObjects = JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(fileStream);
await foreach (var obj in jsonObjects)
{
if (obj.ValueKind == JsonValueKind.Null || obj.ValueKind == JsonValueKind.Undefined)
continue;
var properties = new
{
question = obj.GetProperty("Question").ToString(),
answer = obj.GetProperty("Answer").ToString(),
};
batch.Add(properties);
if (batch.Count == batchSize)
{
await collection.Data.InsertMany(batch);
Console.WriteLine($"Imported {batch.Count} articles...");
batch.Clear();
}
}
if (batch.Any())
{
await collection.Data.InsertMany(batch);
Console.WriteLine($"Imported remaining {batch.Count} articles...");
}
Console.WriteLine("Finished importing articles.");
流式传输 CSV 文件示例代码
import pandas as pd
counter = 0
interval = 200
def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
with client.batch.fixed_size(batch_size=100) as batch:
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
)
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print("pandas dataframe iterator with lazy-loading, to not load all records in RAM at once...")
with client.batch.fixed_size(batch_size=200) as batch:
with pd.read_csv(
"jeopardy_1k.csv",
usecols=["Question", "Answer", "Category"],
chunksize=100,
) as csv_iterator:
for chunk in csv_iterator:
for index, row in chunk.iterrows():
properties = {
"question": row["Question"],
"answer": row["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
)
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print(f"Finished importing {counter} articles.")
import weaviate from 'weaviate-client';
import fs from 'fs';
import csv from 'csv-parser';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
});
counter++;
if (counter % batchSize === 0) {
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
for (const r of response)
if (r.result.errors)
throw r.result.errors;
console.log(`Imported ${counter} articles...`);
}
}
async function importCSV(filePath) {
const stream = fs.createReadStream(filePath).pipe(csv());
for await (const row of stream) {
await addObject(row);
}
}
await importCSV('jeopardy_1k.csv');
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
int batchSize = 100;
List<Map<String, Object>> batch = new ArrayList<>(batchSize);
var collection = client.collections.use("JeopardyQuestion");
System.out
.println("CSV streaming to not load all records in RAM at once...");
try (BufferedReader csvReader =
new BufferedReader(new FileReader("jeopardy_1k.csv"))) {
String line = csvReader.readLine();
while ((line = csvReader.readLine()) != null) {
String[] data = line.split("\",\"");
Map<String, Object> properties = new HashMap<>();
properties.put("question", data[0].substring(1));
properties.put("answer", data[1].substring(0, data[1].length() - 1));
batch.add(properties);
if (batch.size() == batchSize) {
collection.data.insertMany(batch.toArray(new Map[0]));
System.out.println("Imported " + batch.size() + " articles...");
batch.clear();
}
}
}
if (!batch.isEmpty()) {
collection.data.insertMany(batch.toArray(new Map[0]));
System.out.println("Imported remaining " + batch.size() + " articles...");
}
System.out.println("Finished importing articles.");
int batchSize = 100;
var batch = new List<object>(batchSize);
var collection = client.Collections.Use("JeopardyQuestion");
Console.WriteLine("CSV streaming to not load all records in RAM at once...");
using (var reader = new StreamReader(CsvDataFile))
using (var csv = new CsvReader(reader, CultureInfo.InvariantCulture))
{
var records = csv.GetRecords<JeopardyQuestion>();
foreach (var record in records)
{
var properties = new { question = record.Question, answer = record.Answer };
batch.Add(properties);
if (batch.Count == batchSize)
{
await collection.Data.InsertMany(batch);
Console.WriteLine($"Imported {batch.Count} articles...");
batch.Clear();
}
}
}
if (batch.Any())
{
await collection.Data.InsertMany(batch);
Console.WriteLine($"Imported remaining {batch.Count} articles...");
}
Console.WriteLine("Finished importing articles.");
批量向量化
一些 模型提供商提供批量向量化 API,其中每个请求可以包含多个对象。
从 Weaviate v1.25.0 开始,批量导入会自动利用可用的模型提供商的批量向量化 API。 这减少了对模型提供商的请求数量,从而提高了吞吐量。
模型提供商配置
您可以配置每个模型提供商的批量向量化设置,例如每分钟的请求或每分钟的令牌。 以下示例为 Cohere 和 OpenAI 集成设置速率限制,并为两者提供 API 密钥。
请注意,每个提供商公开不同的配置选项。
from weaviate.classes.config import Integrations
integrations = [
Integrations.cohere(
api_key=cohere_key,
requests_per_minute_embeddings=rpm_embeddings,
),
Integrations.openai(
api_key=openai_key,
requests_per_minute_embeddings=rpm_embeddings,
tokens_per_minute_embeddings=tpm_embeddings,
),
]
client.integrations.configure(integrations)
其他注意事项
数据导入可能需要大量资源。 在导入大量数据时,请考虑以下事项。
异步导入
从 v1.22 开始可用。 这是一个实验性功能。 请谨慎使用。
为了最大限度地提高导入速度,请启用 异步索引。
要启用异步索引,请在 Weaviate 配置文件中将 ASYNC_INDEXING 环境变量设置为 true。
weaviate:
image: cr.weaviate.io/semitechnologies/weaviate:1.35.3
...
environment:
ASYNC_INDEXING: 'true'
...
自动添加新租户
默认情况下,如果您尝试将对象插入到不存在的租户中,Weaviate 会返回错误。 要更改此行为,以便 Weaviate 创建新租户,请在集合定义中将 autoTenantCreation 设置为 true。
自动租户功能从 v1.25.0 开始适用于批量导入,并且从 v1.25.2 开始适用于单个对象插入。
在创建集合时或根据需要重新配置集合以更新设置,可以设置 autoTenantCreation。
自动租户功能在导入大量对象时很有用。 如果您的数据可能存在小的不一致或拼写错误,请小心。 例如,名称 TenantOne、tenantOne 和 TenntOne 将创建三个不同的租户。
有关详细信息,请参阅 自动租户。
更多资源
问题和反馈
如果您有任何问题或反馈,请在 用户论坛 中告诉我们。