Weaviate 实例可以被复制。复制可以提高读取吞吐量,提高可用性,并实现零停机时间升级。
有关 Weaviate 中复制的设计和构建方式的更多详细信息,请参阅 复制架构。
集合的复制因子不能通过更新集合的定义来更新。
从 v1.32 版本开始,通过使用 副本移动,可以更改分片的 复制因子。
默认情况下,复制被禁用。可以在 集合配置中为每个集合启用它。这意味着您可以为数据集中的每个类设置不同的复制因子。
要启用复制,您可以设置以下其中一个或两个
REPLICATION_MINIMUM_FACTOR 环境变量用于整个 Weaviate 实例,或者
- 集合的
replicationFactor 参数。
Weaviate 范围内的最小复制因子
REPLICATION_MINIMUM_FACTOR 环境变量设置 Weaviate 实例中所有集合的最小复制因子。
如果您设置了 集合的复制因子,则该集合的复制因子将覆盖最小复制因子。
集合的复制因子
from weaviate.classes.config import Configure
client.collections.create(
"Article",
replication_config=Configure.replication(
factor=3,
),
)
import { configure } from 'weaviate-client';
await client.collections.create({
name: 'Article',
replication: configure.replication({
factor: 1
}),
})
package main
import (
"context"
"github.com/weaviate/weaviate-go-client/v5/weaviate"
"github.com/weaviate/weaviate/entities/models"
)
func main() {
cfg := weaviate.Config{
Host: "localhost:8080",
Scheme: "http",
}
client, err := weaviate.NewClient(cfg)
if err != nil {
panic(err)
}
classObj := &models.Class{
Class: "Article",
Properties: []*models.Property{
{
DataType: []string{"string"},
Name: "title",
}
},
ReplicationConfig: &models.ReplicationConfig{
Factor: 3,
}
}
err := client.Schema().ClassCreator().WithClass(classObj).Do(context.Background())
if err != nil {
panic(err)
}
}
package io.weaviate;
import java.util.ArrayList;
import io.weaviate.client.Config;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.schema.model.DataType;
import io.weaviate.client.v1.schema.model.Property;
import io.weaviate.client.v1.schema.model.WeaviateClass;
import io.weaviate.client.v1.misc.model.ReplicationConfig;
public class App {
public static void main(String[] args) {
Config config = new Config("http", "localhost:8080");
WeaviateClient client = new WeaviateClient(config);
ReplicationConfig replicationConfig = ReplicationConfig.builder()
.factor(3)
.build();
WeaviateClass clazz = WeaviateClass.builder()
.className("Article")
.replicationConfig(replicationConfig)
.properties(new ArrayList() { {
add(Property.builder()
.dataType(new ArrayList(){ { add(DataType.STRING); } })
.name("title")
.build());
} })
.build();
Result<Boolean> result = client.schema().classCreator().withClass(clazz).run();
if (result.hasErrors()) {
System.out.println(result.getError());
return;
}
System.out.println(result.getResult());
}
}
curl \
-X POST \
-H "Content-Type: application/json" \
-d '{
"class": "Article",
"properties": [
{
"dataType": [
"string"
],
"description": "Title of the article",
"name": "title"
}
],
"replicationConfig": {
"factor": 3
}
}' \
https://:8080/v1/schema
在此示例中,有三个副本。如果您在导入数据之前设置了复制因子,则所有数据将复制三次。
可以在添加数据到集合之后修改复制因子。之后修改复制因子,新数据将复制到新的和预先存在的副本节点。
示例数据模式具有 写入一致性级别为 ALL。当您上传或更新模式时,更改将发送到 ALL 节点(通过协调器节点)。协调器节点等待来自 ALL 节点的成功确认,然后才能向客户端发送成功消息。这可确保在您的分布式 Weaviate 设置中具有高度一致的模式。
数据一致性
当 Weaviate 检测到节点间数据不一致时,它会尝试修复不同步的数据。
从 v1.26 开始,Weaviate 添加了 异步复制 以主动检测不一致性。在早期版本中,Weaviate 使用 读取时修复策略在读取时修复不一致性。
读取时修复是自动的。要激活异步复制,请在您的集合定义中的 replicationConfig 部分将 asyncEnabled 设置为 true。
from weaviate.classes.config import Configure
client.collections.create(
"Article",
replication_config=Configure.replication(
factor=3,
async_enabled=True,
),
)
import { configure } from 'weaviate-client';
await client.collections.create({
name: 'Article',
replication: configure.replication({
factor: 1,
asyncEnabled: true,
}),
})
curl \
-X POST \
-H "Content-Type: application/json" \
-d '{
"class": "Article",
"properties": [
{
"dataType": [
"string"
],
"description": "Title of the article",
"name": "title"
}
],
"replicationConfig": {
"factor": 3,
"asyncEnabled": true
}
}' \
https://:8080/v1/schema
配置异步复制设置
用于配置异步复制的 环境变量(ASYNC_*)是在 v1.29 中引入的。
异步复制有助于实现跨多个节点复制的数据的一致性。
根据您的特定用例更新以下 环境变量 以配置异步复制。
日志记录
- 设置日志记录的频率:
ASYNC_REPLICATION_LOGGING_FREQUENCY 定义异步复制后台进程记录事件的频率。
数据比较
- 设置比较频率:
ASYNC_REPLICATION_FREQUENCY 定义每个节点比较其本地数据与其它节点的频率。
- 设置比较超时:
ASYNC_REPLICATION_DIFF_PER_NODE_TIMEOUT 可选地配置在节点无响应时比较期间的等待超时时间。
- 监控节点可用性:
ASYNC_REPLICATION_ALIVE_NODES_CHECKING_FREQUENCY 每当节点可用性发生变化时触发比较。
- 配置哈希树高度:
ASYNC_REPLICATION_HASHTREE_HEIGHT 指定哈希树的大小,这有助于通过在多个级别比较哈希摘要而不是扫描整个数据集来缩小数据差异。有关异步复制的内存和性能注意事项,请参阅 此页面。
- 摘要比较的批处理大小:
ASYNC_REPLICATION_DIFF_BATCH_SIZE 定义在传播实际对象之前,比较节点间其摘要(例如,上次更新时间)的对象数量。
数据同步
检测到节点之间的差异后,Weaviate 会传播过时或丢失的数据。按照以下方式配置同步
- 设置传播频率:
ASYNC_REPLICATION_FREQUENCY_WHILE_PROPAGATING 在节点上完成同步后,暂时将数据比较频率调整为设置的值。
- 设置传播超时:
ASYNC_REPLICATION_PROPAGATION_TIMEOUT 可选地配置在节点无响应时传播期间的等待超时时间。
- 设置传播延迟:
ASYNC_REPLICATION_PROPAGATION_DELAY 定义延迟期,以允许异步写入操作到达所有节点,然后再传播新的或更新的对象。
- 数据传播的批处理大小:
ASYNC_REPLICATION_PROPAGATION_BATCH_SIZE 定义在传播阶段发送的每个批次中的对象数量。
- 设置传播限制:
ASYNC_REPLICATION_PROPAGATION_LIMIT 强制限制每个复制迭代传播的未同步对象的数量。
- 设置传播并发:
ASYNC_REPLICATION_PROPAGATION_CONCURRENCY 指定可以同时将对象批次发送到其它节点的并发工作器数量,从而允许同时发送多个传播批次。
根据您的集群大小和网络延迟调整这些设置,以实现最佳性能。对于高流量集群,较小的批处理大小和较短的超时时间可能更有益,而较大的集群可能需要更保守的设置。
如何使用:查询
当您添加(写入)或查询(读取)数据时,集群中的一个或多个副本节点将响应请求。需要多少节点发送成功的响应和确认到协调器节点取决于 consistency_level。可用的 一致性级别 是 ONE、QUORUM(replication_factor / 2 + 1)和 ALL。
consistency_level 可以在查询时指定
# Get an object by ID, with consistency level ONE
curl "https://:8080/v1/objects/{ClassName}/{id}?consistency_level=ONE"
在 v1.17 中,只有 通过 ID 获取数据的读取查询 具有可调整的一致性级别。所有其它对象特定的 REST 端点(读取或写入)使用一致性级别 ALL。从 v1.18 开始,所有写入和读取查询都可以调整为 ONE、QUORUM(默认)或 ALL。GraphQL 端点使用一致性级别 ONE(在两个版本中)。
from weaviate.classes.config import ConsistencyLevel
questions = client.collections.use(collection_name).with_consistency_level(
consistency_level=ConsistencyLevel.QUORUM
)
response = collection.query.fetch_object_by_id("36ddd591-2dee-4e7e-a3cc-eb86d30a4303")
for o in response.objects:
print(o.properties)
const myCollection = client.collections.use('Article').withConsistency('QUORUM');
const result = await myCollection.query.fetchObjectById("36ddd591-2dee-4e7e-a3cc-eb86d30a4303")
console.log(JSON.stringify(result, null, 2));
package main
import (
"context"
"fmt"
"github.com/weaviate/weaviate-go-client/v5/weaviate/data/replication"
"github.com/weaviate/weaviate-go-client/v5/weaviate"
)
func main() {
cfg := weaviate.Config{
Host: "localhost:8080",
Scheme: "http",
}
client, err := weaviate.NewClient(cfg)
if err != nil {
panic(err)
}
data, err := client.Data().ObjectsGetter().
WithClassName("MyClass").
WithID("36ddd591-2dee-4e7e-a3cc-eb86d30a4303").
WithConsistencyLevel(replication.ConsistencyLevel.ONE).
Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("%v", data)
}
var jeopardyWithConsistency = client.collections.use("JeopardyQuestion")
.withConsistencyLevel(ConsistencyLevel.QUORUM);
var response = jeopardyWithConsistency.query.fetchObjectById(uuid);
System.out.println(response.get().properties());
package io.weaviate;
import java.util.List;
import io.weaviate.client.Config;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.data.model.WeaviateObject;
import io.weaviate.client.v1.data.replication.model.ConsistencyLevel;
public class App {
public static void main(String[] args) {
Config config = new Config("http", "localhost:8080");
WeaviateClient client = new WeaviateClient(config);
Result<List<WeaviateObject>> result = client.data().objectsGetter()
.withClassName("MyClass")
.withID("36ddd591-2dee-4e7e-a3cc-eb86d30a4303")
.withConsistencyLevel(ConsistencyLevel.ONE)
.run();
if (result.hasErrors()) {
System.out.println(result.getError());
return;
}
System.out.println(result.getResult());
}
}
var jeopardy = client
.Collections.Use("JeopardyQuestion")
.WithConsistencyLevel(ConsistencyLevels.Quorum);
var response = await jeopardy.Query.FetchObjectByID((Guid)validId);
Console.WriteLine(response);
curl "https://:8080/v1/objects/MyClass/36ddd591-2dee-4e7e-a3cc-eb86d30a4303?consistency_level=QUORUM"
# The parameter "consistency_level" can be one of ALL, QUORUM (default), or ONE. Determines how many
# replicas must acknowledge a request before it is considered successful.
# curl "/v1/objects/{ClassName}/{id}?consistency_level=ONE"
副本移动和状态
除了设置初始复制因子之外,您还可以主动管理 Weaviate 集群中分片副本的放置。这对于在扩展后重新平衡数据、退役节点或优化数据局部性很有用。副本移动通过一组专用的 RESTful API 端点 或 通过客户端库以编程方式进行管理。
相关页面
问题和反馈
如果您有任何问题或反馈,请在 用户论坛 中告诉我们。