跳至主要内容
前往文档
⌘U
Weaviate 数据库

使用 Weaviate 的 APIs 和工具开发 AI 应用

部署

部署、配置和维护 Weaviate 数据库

Weaviate Agents

使用 Weaviate 构建和部署智能代理

Weaviate Cloud

在云端管理和扩展 Weaviate

更多资源

集成
贡献者指南
活动 & 工作坊
Weaviate Academy

需要帮助?

Weaviate Logo询问 AI 助手⌘K
社区论坛

批量导入

批量导入批量导入是一种高效地添加多个数据对象和交叉引用的方法。

其他信息

要创建批量导入作业,请按照以下步骤操作

  1. 初始化一个批量对象。
  2. 将项目添加到批量对象。
  3. 确保发送(刷新)最后一个批量。

基本导入

以下示例将对象添加到MyCollection集合。

py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue
data_rows = [
{"title": f"Object {i+1}"} for i in range(5)
]

collection = client.collections.use("MyCollection")

with collection.batch.fixed_size(batch_size=200) as batch:
for data_row in data_rows:
batch.add_object(
properties=data_row,
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break

failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")

错误处理

在批量导入期间,任何失败的对象或引用都将被存储,并且可以通过batch.failed_objectsbatch.failed_references获得。 此外,还会维护失败对象和引用的运行计数,并且可以通过在上下文管理器中访问batch.number_errors来访问。 可以使用此计数器停止导入过程,以便调查失败的对象或引用。

在 Python 客户端参考页面了解更多关于错误处理的信息。

服务器端批量

预览

服务器端批处理是在 v1.34 中作为预览版添加的。

这意味着该功能仍在开发中,未来版本可能会发生变化,包括潜在的破坏性更改。目前,我们不建议在生产环境中使用此功能。

以下是如何使用服务器端批量导入将对象导入到名为MyCollection的集合中。 客户端将根据服务器的反馈以批量大小发送数据。

py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue
data_rows = [
{"title": f"Object {i+1}"} for i in range(5)
]

collection = client.collections.get("MyCollection")

# Use `experimental` for server-side batching. The client will send data
# in batches at a rate specified by the server.
with collection.batch.experimental() as batch:
for data_row in data_rows:
batch.add_object(
properties=data_row,
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break

failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")

使用 gRPC API

v1.23 中添加。

gRPC API比 REST API 更快。 使用 gRPC API 以提高导入速度。

py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue

Python 客户端默认使用 gRPC。


旧版 Python 客户端不支持 gRPC。

指定 ID 值

Weaviate 为每个对象生成 UUID。 对象 ID 必须是唯一的。 如果您设置对象 ID,请使用以下确定性 UUID 方法之一以防止重复 ID

  • generate_uuid5 (Python)
  • generateUuid5 (TypeScript)
py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue
from weaviate.util import generate_uuid5  # Generate a deterministic ID

data_rows = [{"title": f"Object {i+1}"} for i in range(5)]

collection = client.collections.use("MyCollection")

with collection.batch.fixed_size(batch_size=200) as batch:
for data_row in data_rows:
obj_uuid = generate_uuid5(data_row)
batch.add_object(
properties=data_row,
uuid=obj_uuid
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break

failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")

指定向量

使用 vector 属性为每个对象指定向量。

py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
vectors = [[0.1] * 1536 for i in range(5)]

collection = client.collections.use("MyCollection")

with collection.batch.fixed_size(batch_size=200) as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector=vectors[i]
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break

failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")

指定命名向量

新增于 v1.24

创建对象时,可以指定命名向量(如果在您的集合中配置)。

py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue
data_rows = [{
"title": f"Object {i+1}",
"body": f"Body {i+1}"
} for i in range(5)]

title_vectors = [[0.12] * 1536 for _ in range(5)]
body_vectors = [[0.34] * 1536 for _ in range(5)]

collection = client.collections.use("MyCollection")

with collection.batch.fixed_size(batch_size=200) as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector={
"title": title_vectors[i],
"body": body_vectors[i],
}
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break

failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")

使用引用导入

您可以批量创建从一个对象到另一个对象的链接,通过交叉引用。

py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue
collection = client.collections.use("Author")

with collection.batch.fixed_size(batch_size=100) as batch:
batch.add_reference(
from_property="writesFor",
from_uuid=from_uuid,
to=target_uuid,
)

failed_references = collection.batch.failed_references
if failed_references:
print(f"Number of failed imports: {len(failed_references)}")
print(f"First failed reference: {failed_references[0]}")

Python 特定的注意事项

Python 客户端具有内置的批量方法,可帮助您优化导入速度。 有关详细信息,请参阅客户端文档

异步 Python 客户端和批量

目前,异步 Python 客户端不支持批量。 要使用批量,请使用同步 Python 客户端。

从大文件流式传输数据

如果您的数据集很大,请考虑流式传输导入以避免内存不足的问题。

要尝试示例代码,请下载示例数据并创建示例输入文件。

获取示例数据
import requests

# Download the json file
response = requests.get(
"https://raw.githubusercontent.com/weaviate-tutorials/intro-workshop/main/data/jeopardy_1k.json"
)

# Write the json file to disk
data = response.json()
with open('jeopardy_1k.json', 'w') as f:
json.dump(data, f)

# # Uncomment this section to create a csv file
# import pandas as pd

# df = pd.read_json("jeopardy_1k.json")
# df.to_csv("jeopardy_1k.csv", index=False)
流式传输 JSON 文件示例代码
py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue
import ijson

# Settings for displaying the import progress
counter = 0
interval = 200 # print progress every this many records; should be bigger than the batch_size

print("JSON streaming, to avoid running out of memory on large files...")
with client.batch.fixed_size(batch_size=100) as batch:
with open("jeopardy_1k.json", "rb") as f:
objects = ijson.items(f, "item")
for obj in objects:
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)

# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")


print(f"Finished importing {counter} articles.")
流式传输 CSV 文件示例代码
import pandas as pd

# Settings for displaying the import progress
counter = 0
interval = 200 # print progress every this many records; should be bigger than the batch_size

def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}

with client.batch.fixed_size(batch_size=100) as batch:
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)

# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")


print("pandas dataframe iterator with lazy-loading, to not load all records in RAM at once...")
with client.batch.fixed_size(batch_size=200) as batch:
with pd.read_csv(
"jeopardy_1k.csv",
usecols=["Question", "Answer", "Category"],
chunksize=100, # number of rows per chunk
) as csv_iterator:
# Iterate through the dataframe chunks and add each CSV record to the batch
for chunk in csv_iterator:
for index, row in chunk.iterrows():
properties = {
"question": row["Question"],
"answer": row["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)

# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")

print(f"Finished importing {counter} articles.")

批量向量化

v1.25 中添加。

一些 模型提供商提供批量向量化 API,其中每个请求可以包含多个对象。

从 Weaviate v1.25.0 开始,批量导入会自动利用可用的模型提供商的批量向量化 API。 这减少了对模型提供商的请求数量,从而提高了吞吐量。

模型提供商配置

您可以配置每个模型提供商的批量向量化设置,例如每分钟的请求或每分钟的令牌。 以下示例为 Cohere 和 OpenAI 集成设置速率限制,并为两者提供 API 密钥。

请注意,每个提供商公开不同的配置选项。

py docs  API 文档
更多信息文档中的代码片段反映了最新的客户端库和 Weaviate 数据库版本。请查看 发行说明 以获取特定版本。

如果某个片段无法工作或您有任何反馈,请打开一个 GitHub issue
from weaviate.classes.config import Integrations

integrations = [
# Each model provider may expose different parameters
Integrations.cohere(
api_key=cohere_key,
requests_per_minute_embeddings=rpm_embeddings,
),
Integrations.openai(
api_key=openai_key,
requests_per_minute_embeddings=rpm_embeddings,
tokens_per_minute_embeddings=tpm_embeddings, # e.g. OpenAI also exposes tokens per minute for embeddings
),
]
client.integrations.configure(integrations)

其他注意事项

数据导入可能需要大量资源。 在导入大量数据时,请考虑以下事项。

异步导入

实验性

v1.22 开始可用。 这是一个实验性功能。 请谨慎使用。

为了最大限度地提高导入速度,请启用 异步索引

要启用异步索引,请在 Weaviate 配置文件中将 ASYNC_INDEXING 环境变量设置为 true

weaviate:
image: cr.weaviate.io/semitechnologies/weaviate:1.35.3
...
environment:
ASYNC_INDEXING: 'true'
...

自动添加新租户

默认情况下,如果您尝试将对象插入到不存在的租户中,Weaviate 会返回错误。 要更改此行为,以便 Weaviate 创建新租户,请在集合定义中将 autoTenantCreation 设置为 true

自动租户功能从 v1.25.0 开始适用于批量导入,并且从 v1.25.2 开始适用于单个对象插入。

在创建集合时或根据需要重新配置集合以更新设置,可以设置 autoTenantCreation

自动租户功能在导入大量对象时很有用。 如果您的数据可能存在小的不一致或拼写错误,请小心。 例如,名称 TenantOnetenantOneTenntOne 将创建三个不同的租户。

有关详细信息,请参阅 自动租户

更多资源

问题和反馈

如果您有任何问题或反馈,请在 用户论坛 中告诉我们。