47 故障容错:如何在Worker崩溃时进行重新调度?
你好,我是郑建勋。
上一节课,我们用随机的方式为资源分配了它所属的Worker。这一节课,让我们更进一步优化资源的分配。
对资源进行分配不仅发生在正常的事件内,也可能发生在Worker节点崩溃等特殊时期。这时,我们需要将崩溃的Worker节点中的任务转移到其他节点。
Master调度的时机
具体来说,分配资源的时机可能有下面三种情况。
- 当Master成为Leader时。
- 当客户端调用Master API进行资源的增删查改时。
- 当Master监听到Worker节点发生变化时。
其中,第二点“调用Master API进行资源的增删查改”我们会在这节课的最后完成,下面让我们实战一下剩下两点是如何实现资源的调度的。
Master成为Leader时的资源调度
在日常实践中,Leader的频繁切换并不常见。不管是Master在初始化时选举成为了Leader,还是在中途由于其他Master异常退出导致Leader发生了切换,我们都要全量地更新一下当前Worker的节点状态以及资源的状态。
在Master成为Leader节点时,我们首先要利用m.updateWorkNodes 方法全量加载当前的Worker节点,同时利用m.loadResource 方法全量加载当前的爬虫资源。
func (m *Master) BecomeLeader() error {
m.updateWorkNodes()
if err := m.loadResource(); err != nil {
return fmt.Errorf("loadResource failed:%w", err)
}
m.reAssign()
atomic.StoreInt32(&m.ready, 1)
return nil
}
接下来,调用reAssign方法完成一次资源的分配。m.reAssign会遍历资源,当发现有资源还没有分配节点时,将再次尝试将资源分配到Worker中。如果发现资源都已经分配给了对应的Worker,它就会查看当前节点是否存活。如果当前节点已经不存在了,就将该资源分配给其他的节点。
func (m *Master) reAssign() {
rs := make([]*ResourceSpec, 0, len(m.resources))
for _, r := range m.resources {
if r.AssignedNode == "" {
rs = append(rs, r)
continue
}
id, err := getNodeID(r.AssignedNode)
if err != nil {
m.logger.Error("get nodeid failed", zap.Error(err))
}
if _, ok := m.workNodes[id]; !ok {
rs = append(rs, r)
}
}
m.AddResources(rs)
}
func (m *Master) AddResources(rs []*ResourceSpec) {
for _, r := range rs {
m.addResources(r)
}
}
之前我们已经维护了资源的ID、事件以及分配的Worker节点等信息。 在这里让我们更进一步,当资源分配到节点上时,更新节点的状态。
为此我抽象出了一个新的结构NodeSpec,我们用它来描述Worker节点的状态。NodeSpec封装了Worker注册到etcd中的节点信息registry.Node。同时,我们额外增加了一个Payload字段,用于标识当前Worker节点的负载。当资源分配到对应的Worker节点上时,则更新Worker节点的状态,让Payload负载加1。
type NodeSpec struct {
Node *registry.Node
Payload int
}
func (m *Master) addResources(r *ResourceSpec) (*NodeSpec, error) {
ns, err := m.Assign(r)
...
r.AssignedNode = ns.Node.Id + "|" + ns.Node.Address
_, err = m.etcdCli.Put(context.Background(), getResourcePath(r.Name), encode(r))
m.resources[r.Name] = r
ns.Payload++
return ns, nil
}
Worker节点发生变化时的资源更新
当我们发现Worker节点发生变化时,也需要全量完成一次更新。这是为了及时发现当前已经崩溃的Worker节点,并将这些崩溃的Worker节点下的任务转移给其他Worker节点运行。
如下所示,当Master监听workerNodeChange通道,发现Worker节点产生了变化之后,就会像成为Leader一样,更新当前节点与资源的状态,然后调用m.reAssign方法重新调度资源。
func (m *Master) Campaign() {
...
for {
select {
case resp := <-workerNodeChange:
m.logger.Info("watch worker change", zap.Any("worker:", resp))
m.updateWorkNodes()
if err := m.loadResource(); err != nil {
m.logger.Error("loadResource failed:%w", zap.Error(err))
}
m.reAssign()
}
}
}
负载均衡的资源分配算法
接下来,我们再重新看看资源的分配。上节课我们都是将资源随机分配到某一个Worker上的,但是在实践中很可能会有多个Worker,而为了对资源进行合理的分配,需要实现负载均衡,让Worker节点分摊工作量。
负载均衡分配资源的算法有很多,例如轮询法、加权轮询法、随机法、最小负载法等等,而根据实际场景,还可能需要有特殊的调度逻辑。这里我们实现一种简单的调度算法:最小负载法。在我们当前的场景中,最小负载法能够比较均衡地将爬虫任务分摊到Worker节点中。它每一次都将资源分配给具有最低负载的Worker节点,这依赖于我们维护的节点的状态。
如下所示,第一步我们遍历所有的Worker节点,找到合适的Worker节点。其实这一步可以完成一些简单的筛选,过滤掉一些不匹配的Worker。举一个例子,有些任务比较特殊,在计算时需要使用到GPU,那么我们就只能将它调度到有GPU的Worker节点中。这里我们没有实现更复杂的筛选逻辑,把当前全量的Worker节点都作为候选节点,放入到了candidates队列中。
func (m *Master) Assign(r *ResourceSpec) (*NodeSpec, error) {
candidates := make([]*NodeSpec, 0, len(m.workNodes))
for _, node := range m.workNodes {
candidates = append(candidates, node)
}
// 找到最低的负载
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].Payload < candidates[j].Payload
})
if len(candidates) > 0 {
return candidates[0], nil
}
return nil, errors.New("no worker nodes")
}
第二步,根据负载对Worker队列进行排序。这里我使用了标准库sort中的Slice函数。Slice函数的第一个参数为candidates队列;第二个参数是一个函数,它可以指定排序的优先级条件,这里我们指定负载越小的Worker节点优先级越高。所以在排序之后,负载最小的Worker节点会排在前面。
第三步,取排序之后的第一个节点作为目标Worker节点。
现在,让我们来验证一下资源分配是否成功实现了负载均衡。首先,启动两个Worker节点。
» go run main.go worker --id=1 --http=:8080 --grpc=:9090
» go run main.go worker --id=2 --http=:8079 --grpc=:9089
接着,我们在配置文件中加入5个任务,并启动一个Master节点。
Master在初始化时就会完成任务的分配,我们可以在etcd中查看资源的分配情况,结果如下所示。
» docker exec etcd-gcr-v3.5.6 /bin/sh -c "/usr/local/bin/etcdctl get --prefix /resources" jackson@bogon
/resources/douban_book_list
{"ID":"1604065810010083328","Name":"douban_book_list","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1671274065865783000}
/resources/task-test-1
{"ID":"1604066677018857472","Name":"task-test-1","AssignedNode":"go.micro.server.worker-1|192.168.0.107:9090","CreationTime":1671274272579882000}
/resources/task-test-2
{"ID":"1604066699756179456","Name":"task-test-2","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1671274278001122000}
/resources/task-test-3
{"ID":"1604066716206239744","Name":"task-test-3","AssignedNode":"go.micro.server.worker-1|192.168.0.107:9090","CreationTime":1671274281922539000}
/resources/xxx
{"ID":"1604065810026860544","Name":"xxx","AssignedNode":"go.micro.server.worker-1|192.168.0.107:9090","CreationTime":1671274065869756000}
观察资源分配的Worker节点,会发现当前有3个任务分配到了go.micro.server.worker-2,有2个节点分配到了go.micro.server.worker-1,说明我们现在的负载均衡策略符合预期。
接下来,让我们删除worker-1节点,验证一下worker-1中的资源是否会自动迁移到worker-2中。输入 Ctrl+C
退出worker-1节点,然后回到etcd中查看资源分配的情况,发现所有的资源都已经迁移到了worker-2中。这说明当Worker节点崩溃后,重新调度任务的策略是符合预期的。
» docker exec etcd-gcr-v3.5.6 /bin/sh -c "/usr/local/bin/etcdctl get --prefix /resources" jackson@bogon
/resources/douban_book_list
{"ID":"1604065810010083328","Name":"douban_book_list","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1671274065865783000}
/resources/task-test-1
{"ID":"1604069265235775488","Name":"task-test-1","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1671274889679244000}
/resources/task-test-2
{"ID":"1604066699756179456","Name":"task-test-2","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1671274278001122000}
/resources/task-test-3
{"ID":"1604069265252552704","Name":"task-test-3","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1671274889683174000}
/resources/xxx
{"ID":"1604069265269329920","Name":"xxx","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1671274889687807000}
最后我们来看看Master Leader切换时的情况。我们新建一个Master,它的ID为3。输入Ctrl+C中断之前的Master节点。
这时再次查看etcd中的资源分配情况,会发现资源的信息没有任何变化。这是符合预期的,因为当前的资源在之前都已经分配给了Worker,不需要再重新分配了。
实战Master资源处理API
接下来,让我们为Master实现对外暴露的API,方便外部客户端进行访问,实现资源的增删查改。按照惯例,我们仍然会为API实现GRPC协议和HTTP协议。
首先,我们要在crawler.proto中书写Master服务的Protocol Buffer协议。
我们先为Master加入两个RPC接口。其中,AddResource接口用于增加资源,参数为结构体ResourceSpec,表示添加资源的信息。其中最重要的参数是name,它标识了具体启动哪一个爬虫任务。返回值为结构体NodeSpec,NodeSpec描述了资源分配到Worker节点的信息。 DeleteResource接口用于删除资源,请求参数为资源信息,而不需要有任务返回值信息,因此这里定义为空结构体Empty。为了引用Empty,在这里我们导入了google/protobuf/empty.proto库。
syntax = "proto3";
option go_package = "proto/crawler";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
service CrawlerMaster {
rpc AddResource(ResourceSpec) returns (NodeSpec) {
option (google.api.http) = {
post: "/crawler/resource"
body: "*"
};
}
rpc DeleteResource(ResourceSpec) returns (google.protobuf.Empty){
option (google.api.http) = {
delete: "/crawler/resource"
body: "*"
};
}
}
message ResourceSpec {
string id = 1;
string name = 2;
string assigned_node = 3;
int64 creation_time = 4;
}
message NodeSpec {
string id = 1;
string Address = 2;
}
代码中的 option 是 GRPC-gateway 使用的信息,用于生成与GRPC方法对应的HTTP代理请求。在option中,AddResource对应的HTTP方法为POST,URL为/crawler/resource。
DeleteResource对应的URL仍然为/crawler/resource,不过HTTP方法为DELETE。 body: "*"
表示GRPC-gateway将接受HTTP Body中的信息,并会将其解析为对应的请求。
下一步,执行protoc命令,生成对应的micro GRPC文件和HTTP代理文件。
» protoc -I $GOPATH/src -I . --micro_out=. --go_out=. --go-grpc_out=. --grpc-gateway_out=logtostderr=true,allow_delete_body=true,register_func_suffix=Gw:. crawler.proto
这里的allow_delete_body表示对于HTTP DELETE方法,HTTP代理服务也可以解析Body中的信息,并将其转换为请求参数。
接下来,我们需要为Master书写对应的方法,让Master实现micro生成的CrawlerMasterHandler接口。
type CrawlerMasterHandler interface {
AddResource(context.Context, *ResourceSpec, *NodeSpec) error
DeleteResource(context.Context, *ResourceSpec, *empty.Empty) error
}
实现 DeleteResource 和 AddResource 这两个方法比较简单。其中,DeleteResource负责判断当前的任务名是否存在,如果存在则调用etcd delete方法删除资源Key,并更新节点的负载。而AddResource方法可以调用我们之前就写好的 m.addResources 方法来添加资源,返回资源分配的节点信息。
func (m *Master) DeleteResource(ctx context.Context, spec *proto.ResourceSpec, empty *empty.Empty) error {
r, ok := m.resources[spec.Name]
if !ok {
return errors.New("no such task")
}
if _, err := m.etcdCli.Delete(context.Background(), getResourcePath(spec.Name)); err != nil {
return err
}
if r.AssignedNode != "" {
nodeID, err := getNodeID(r.AssignedNode)
if err != nil {
return err
}
if ns, ok := m.workNodes[nodeID]; ok {
ns.Payload -= 1
}
}
return nil
}
func (m *Master) AddResource(ctx context.Context, req *proto.ResourceSpec, resp *proto.NodeSpec) error {
nodeSpec, err := m.addResources(&ResourceSpec{Name: req.Name})
if nodeSpec != nil {
resp.Id = nodeSpec.Node.Id
resp.Address = nodeSpec.Node.Address
}
return err
}
最后,我们还要调用micro生成的crawler.RegisterCrawlerMasterHandler函数,将Master注册为GRPC服务。这之后就可以正常处理客户端的访问了。
func RunGRPCServer(MasterService *master.Master, logger *zap.Logger, reg registry.Registry, cfg ServerConfig) {
...
if err := crawler.RegisterCrawlerMasterHandler(service.Server(), MasterService); err != nil {
logger.Fatal("register handler failed", zap.Error(err))
}
if err := service.Run(); err != nil {
logger.Fatal("grpc server stop", zap.Error(err))
}
}
让我们来验证一下Master是否正在正常对外提供服务。
首先,启动Master。接着,通过HTTP访问Master提供的添加资源的接口。如下所示,添加资源“task-test-4”。
» go run main.go master --id=2 --http=:8081 --grpc=:9091
» curl -H "content-type: application/json" -d '{"id":"zjx","name": "task-test-4"}' http://localhost:8081/crawler/resource
{"id":"go.micro.server.worker-2", "Address":"192.168.0.107:9089"}
通过返回值可以看到,当前资源分配到了worker-2,worker-2的IP地址为"192.168.0.107:9089"。
查看etcd中的资源,发现资源已经成功写入了etcd,而且其中分配的Worker节点信息与HTTP接口返回的信息相同。
» docker exec etcd-gcr-v3.5.6 /bin/sh -c "/usr/local/bin/etcdctl get /resources/task-test-4"
/resources/task-test-4
{"ID":"1604109125694787584","Name":"task-test-4","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1671284393144648000}
接着,我们尝试调用Master服务的删除资源接口,删除我们刚刚生成添加的资源。
» curl -X DELETE -H "content-type: application/json" -d '{"name": "task-test-4"}' http://localhost:8081/crawler/resource
再次查看etcd中的资源"task-test-4",发现资源已经被删除了。Master API提供的添加和删除功能验证成功。
总结
这节课,我们对资源分配的时机和资源分配的算法进行了优化。我们模拟了Master和Worker节点崩溃的情况,并用简单的方式实现了节点的重新分配,让当前系统在分布式下具备了故障容错的特性。
在Master调度的时机上,当Master成为Leader,Worker节点崩溃,或者外部调用资源增删查改接口时,Leader 需要对资源进行重新调度。对于调度算法,为了实现负载的均衡,我选择了当前负载最低的Worker的节点作为了优先级最高的节点。在实践中,我们需要结合对应的业务场景设计出最合适的调度逻辑。
最后,我们还为Master实现了GRPC与HTTP API,让Masrer具备了添加资源和删除资源的能力。
课后题
这节课,我们用简单的方式实现了负载均衡的调度算法,但在生产实践中,调度器可能会更复杂。例如有些资源的负载会更大,有些资源只能在某一个Worker上执行,有些资源需要具有亲和性等等。你认为应该如何处理这些情况呢?
如何让我们的程序轻松地切换到另一个调度算法上?
(提示:可以参考 Kubernetes 对资源的复杂调度。)
欢迎你在留言区与我交流讨论,我们下节课见。
- Geek_2c2c44 👍(0) 💬(1)
master调用DeleteResource之后, 只不过worker在下一次在loadresource的时候不会加载被删除的任务而已, 那woker已经运行的爬虫任务岂不是还在运行?
2024-01-31