48 完善核心能力:Master请求转发与Worker资源管理
你好,我是郑建勋。
这节课,让我们继续优化Master服务,实现Master请求转发和并发情况下的资源保护,同时实现Worker对分配资源的监听。
将Master请求转发到Leader
首先我们需要考虑一下,当Master是Follower状态,同时还接收到了请求的情形。在之前的设计中,为了避免并发处理时可能出现的异常情况,我们只打算让Leader来处理请求。所以,当Master节点接收到请求时,如果当前节点不是Leader,我们可以直接报错,由客户端选择正确的Leader节点。如下所示。
func (m *Master) AddResource(ctx context.Context, req *proto.ResourceSpec, resp *proto.NodeSpec) error {
if !m.IsLeader() {
return errors.New("no leader")
}
}
我们还可以采用另外一种更常见的方式:将接收到的请求转发给Leader。要实现这一点,首先所有Master节点要在Leader发生变更时,将当前最新的Leader地址保存到leaderID中。
func (m *Master) Campaign() {
select {
case resp := <-leaderChange:
m.logger.Info("watch leader change", zap.String("leader:", string(resp.Kvs[0].Value)))
m.leaderID = string(resp.Kvs[0].Value)
}
for {
select {
case err := <-leaderCh:
m.leaderID = m.ID
case resp := <-leaderChange:
if len(resp.Kvs) > 0 {
m.logger.Info("watch leader change", zap.String("leader:", string(resp.Kvs[0].Value)))
m.leaderID = string(resp.Kvs[0].Value)
}
}
}
在处理请求前,首先判断当前Master的状态,如果它不是Leader,就获取Leader的地址并完成请求的转发。注意,这里如果不指定Leader的地址,go-micro就会随机选择一个地址进行转发。
func (m *Master) AddResource(ctx context.Context, req *proto.ResourceSpec, resp *proto.NodeSpec) error {
if !m.IsLeader() && m.leaderID != "" && m.leaderID != m.ID {
addr := getLeaderAddress(m.leaderID)
nodeSpec, err := m.forwardCli.AddResource(ctx, req, client.WithAddress(addr))
resp.Id = nodeSpec.Id
resp.Address = nodeSpec.Address
return err
}
nodeSpec, err := m.addResources(&ResourceSpec{Name: req.Name})
if nodeSpec != nil {
resp.Id = nodeSpec.Node.Id
resp.Address = nodeSpec.Node.Address
}
return err
}
在转发时,我们使用了micro生成的GRPC客户端,这是通过在初始化时导入 micro GRPC client的插件实现的。SetForwardCli方法将生成的GRPC client 注入到了Master结构体中。
import (
grpccli "github.com/go-micro/plugins/v4/client/grpc"
)
func RunGRPCServer(m *master.Master, logger *zap.Logger, reg registry.Registry, cfg ServerConfig) {
service := micro.NewService(
...
micro.Client(grpccli.NewClient()),
)
cl := proto.NewCrawlerMasterService(cfg.Name, service.Client())
m.SetForwardCli(cl)
}
接下来,我们来验证一下服务是否能够正确地转发。
首先启动一个 Worker和一个Master服务,当前的 Leader会变成master2,IP地址为192.168.0.105:9091。
现在我们启动一个新的Master服务master3。
接着访问master3服务暴露的HTTP接口。虽然master3并不是Leader,但是访问master3添加资源时,操作仍然能够成功。
» curl --request POST 'http://localhost:8082/crawler/resource' --header 'Content-Type: application/json' --data '{"id":"zjx","name": "task-forward"}'
{"id":"go.micro.server.worker-1","Address":"192.168.0.105:9090"}
同时,我们在Leader服务的日志中能够看到请求信息,验证成功。
{"level":"INFO","ts":"2022-12-29T17:23:55.792+0800","caller":"master/master.go:198","msg":"receive request","method":"CrawlerMaster.AddResource","Service":"go.micro.server.master","request param:":{"id":"zjx","name":"task-forward"}}
资源保护
由于Worker节点与Resource资源一直在动态变化当中,因此如果不考虑数据的并发安全,在复杂线上场景下,就可能出现很多难以解释的现象。
为了避免数据的并发安全问题,我们之前利用了通道来进行协程间的通信,但如果我们现在希望保护Worker节点与Resource资源,其实当前场景下更好的方式是使用原生的互斥锁。这是因为我们只希望在关键位置加锁,其他的逻辑仍然是并行的。如果我们在读取一个变量时还要用通道来通信,代码会变得不优雅。
我们来看下使用原生互斥锁的操作是怎样的。如下,在Master中添加sync.Mutex互斥锁,用于资源的并发安全。
我们可以在资源更新(资源加载与增删查改)、Worker节点更新、资源分配的阶段都加入互斥锁如下所示。
func (m *Master) DeleteResource(ctx context.Context, spec *proto.ResourceSpec, empty *empty.Empty) error {
m.rlock.Lock()
defer m.rlock.Unlock()
r, ok := m.resources[spec.Name]
...
}
func (m *Master) AddResource(ctx context.Context, req *proto.ResourceSpec, resp *proto.NodeSpec) error {
...
m.rlock.Lock()
defer m.rlock.Lock()
nodeSpec, err := m.addResources(&ResourceSpec{Name: req.Name})
if nodeSpec != nil {
resp.Id = nodeSpec.Node.Id
resp.Address = nodeSpec.Node.Address
}
return err
}
func (m *Master) updateWorkNodes() {
services, err := m.registry.GetService(worker.ServiceName)
if err != nil {
m.logger.Error("get service", zap.Error(err))
}
m.rlock.Lock()
defer m.rlock.Unlock()
...
m.workNodes = nodes
}
func (m *Master) loadResource() error {
resp, err := m.etcdCli.Get(context.Background(), RESOURCEPATH, clientv3.WithPrefix(), clientv3.WithSerializable())
...
resources := make(map[string]*ResourceSpec)
m.rlock.Lock()
defer m.rlock.Unlock()
m.resources = resources
}
func (m *Master) reAssign() {
rs := make([]*ResourceSpec, 0, len(m.resources))
m.rlock.Lock()
defer m.rlock.Unlock()
for _, r := range m.resources {...}
for _, r := range rs {
m.addResources(r)
}
}
当外部访问Leader的HTTP接口时,实际上服务端会开辟一个协程并发处理请求。通过使用互斥锁,我们消除了并发访问同一资源可能出现的问题。在实践中,需要合理地使用互斥锁,尽量让锁定的范围足够小,锁定的资源足够少,减少锁等待的时间。
Worker单机模式
接下来我们回到Worker。Worker可以有两种模式,集群模式与单机模式。我们可以在Worker中加一个flag来切换Worker运行的模式。
对于少量的任务,可以直接用单机版的Worker来处理,种子节点来自于配置文件。而对于集群版的Worker,任务将来自Master的分配。
要切换Worker模式只要判断一个flag值cluster即可做到。如下所示,在启动Worker时,如果cluster为false,代表为单机模式。如果cluster为true,代表是集群模式。
WorkerCmd.Flags().BoolVar(
&cluster, "cluster", true, "run mode")
var cluster bool
func (c *Crawler) Run(cluster bool) {
if !cluster {
c.handleSeeds()
}
go c.Schedule()
for i := 0; i < c.WorkCount; i++ {
go c.CreateWork()
}
c.HandleResult()
}
Worker集群模式
在集群模式下,我们还需要书写Worker加载和监听etcd资源这一重要的功能。首先来看看初始化时的资源加载,在初始时,我们生成了etcd client,并注入到Crawler结构中。
endpoints := []string{e.registryURL}
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
return nil, err
}
e.etcdCli = cli
资源加载
Crawler.loadResource方法用于从etcd中加载资源。我们调用etcd Get方法,获取前缀为/resources
的全量资源列表。解析这些资源,查看当前资源分配的节点是否为当前节点。如果分配的节点和当前节点匹配,意味着当前资源是分配给当前节点的,不是当前节点的资源将会被直接忽略。
func (c *Crawler) loadResource() error {
resp, err := c.etcdCli.Get(context.Background(), master.RESOURCEPATH, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return fmt.Errorf("etcd get failed")
}
resources := make(map[string]*master.ResourceSpec)
for _, kv := range resp.Kvs {
r, err := master.Decode(kv.Value)
if err == nil && r != nil {
id := getID(r.AssignedNode)
if len(id) > 0 && c.id == id {
resources[r.Name] = r
}
}
}
c.Logger.Info("leader init load resource", zap.Int("lenth", len(resources)))
c.rlock.Lock()
defer c.rlock.Unlock()
c.resources = resources
for _, r := range c.resources {
c.runTasks(r.Name)
}
return nil
}
资源加载完毕后,分配给当前节点的任务会执行runTask方法,通过任务名从全局任务池中获取爬虫任务,调用t.Rule.Root()获取种子请求,并放入到调度器中执行。
func (c *Crawler) runTasks(taskName string) {
t, ok := Store.Hash[taskName]
if !ok {
c.Logger.Error("can not find preset tasks", zap.String("task name", taskName))
return
}
res, err := t.Rule.Root()
if err != nil {
c.Logger.Error("get root failed",
zap.Error(err),
)
return
}
for _, req := range res {
req.Task = t
}
c.scheduler.Push(res...)
}
资源监听
除了加载资源,在初始化时我们还需要开辟一个新的协程c.watchResource来监听资源的变化。
func (c *Crawler) Run(id string, cluster bool) {
c.id = id
if !cluster {
c.handleSeeds()
}
go c.loadResource()
go c.watchResource()
go c.Schedule()
for i := 0; i < c.WorkCount; i++ {
go c.CreateWork()
}
c.HandleResult()
}
如下所示,我在watchResource函数中书写了一个监听新增资源的功能。watchResource借助etcd client 的Watch方法监听资源的变化。Watch返回值是一个通道,当etcd client监听到etcd中前缀为 /resources
的资源发生变化时,就会将信息写入到通道Watch中。通过通道返回的信息,不仅能够得到当前有变动的资源最新的值,还可以得知当前资源变动的事件是新增、更新还是删除。如果是新增事件,那就调用runTasks启动该资源对应的爬虫任务。
func (c *Crawler) watchResource() {
watch := c.etcdCli.Watch(context.Background(), master.RESOURCEPATH, clientv3.WithPrefix())
for w := range watch {
if w.Err() != nil {
c.Logger.Error("watch resource failed", zap.Error(w.Err()))
continue
}
if w.Canceled {
c.Logger.Error("watch resource canceled")
return
}
for _, ev := range w.Events {
spec, err := master.Decode(ev.Kv.Value)
if err != nil {
c.Logger.Error("decode etcd value failed", zap.Error(err))
}
switch ev.Type {
case clientv3.EventTypePut:
if ev.IsCreate() {
c.Logger.Info("receive create resource", zap.Any("spec", spec))
} else if ev.IsModify() {
c.Logger.Info("receive update resource", zap.Any("spec", spec))
}
c.runTasks(spec.Name)
case clientv3.EventTypeDelete:
c.Logger.Info("receive delete resource", zap.Any("spec", spec))
}
}
}
}
现在让我们来验证一下新增资源的功能,启动Master与Worker节点。
» go run main.go master --id=3 --http=:8082 --grpc=:9092 --pprof=:9982
» go run main.go worker --pprof=:9983
紧接着调用Master的添加资源接口。
» curl -H "content-type: application/json" -d '{"id":"zjx","name": "douban_book_list"}' <http://localhost:8082/crawler/resource> jackson@localhost
{"id":"go.micro.server.worker-1", "Address":"192.168.0.105:9090"}
可以看到,Worker日志中任务开始正常地执行了,验证成功。完整代码你可以查看v0.4.1分支。
{"level":"DEBUG","ts":"2022-12-30T21:06:56.743+0800","caller":"doubanbook/book.go:77","msg":"parse book tag,count: 47"}
{"level":"DEBUG","ts":"2022-12-30T21:07:00.532+0800","caller":"doubanbook/book.go:108","msg":"parse book list,count: 20 url: <https://book.douban.com/tag/随笔>"}
{"level":"DEBUG","ts":"2022-12-30T21:07:04.240+0800","caller":"doubanbook/book.go:108","msg":"parse book list,count: 20 url: <https://book.douban.com/tag/散文>"}
资源删除
接下来让我们继续看看如何删除一个爬虫任务。
我们需要在Watch的选项中设置 clientv3.WithPrevKV()
,这样,当监听到资源的删除时,就能够获取当前删除的资源信息,接着就可以调用 c.deleteTasks
来删除任务了。
func (c *Crawler) watchResource() {
watch := c.etcdCli.Watch(context.Background(), master.RESOURCEPATH, clientv3.WithPrefix(), clientv3.WithPrevKV())
for w := range watch {
for _, ev := range w.Events {
switch ev.Type {
...
case clientv3.EventTypeDelete:
spec, err := master.Decode(ev.PrevKv.Value)
c.Logger.Info("receive delete resource", zap.Any("spec", spec))
if err != nil {
c.Logger.Error("decode etcd value failed", zap.Error(err))
}
c.rlock.Lock()
c.deleteTasks(spec.Name)
c.rlock.Unlock()
}
}
}
}
deleteTasks 会删除 c.resources
中存储的当前Task,并且将Task的Closed变量设置为true。
func (c *Crawler) deleteTasks(taskName string) {
t, ok := Store.Hash[taskName]
if !ok {
c.Logger.Error("can not find preset tasks", zap.String("task name", taskName))
return
}
t.Closed = true
delete(c.resources, taskName)
}
我们在Task中设计了一个新的变量Closed用于标识当前的任务是否已经被删除了。这是因为被删除的任务可能现在还在运行当中,我们通过该变量确认它已经不再运行了。
在一些场景中,我们也可以将标识任务是否已经结束的变量设计为通道类型或者context.Context,然后与select语句结合起来实现多路复用。我们在HTTP标准库中也经常看到这种用法,它可以判断通道的事件与其他事件哪一个先发生。
type Task struct {
Visited map[string]bool
VisitedLock sync.Mutex
//
Closed bool
Rule RuleTree
Options
}
不过我们这里使用一个标识任务是否关闭的bool类型就足够了。在任务流程的核心位置,我们都需要检测该变量。检测到任务关闭时,就不再执行后续的流程。
具体操作是在request.Check方法中,加入对任务是否关闭的判断。
func (r *Request) Check() error {
if r.Depth > r.Task.MaxDepth {
return errors.New("max depth limit reached")
}
if r.Task.Closed {
return errors.New("task has Closed")
}
return nil
}
接着,在任务的采集和调度的两个核心位置检测任务的有效性。一旦发现任务已经被关闭,它所有的请求将不再被调度和采集。
func (c *Crawler) CreateWork() {
for {
req := c.scheduler.Pull()
if err := req.Check(); err != nil {
c.Logger.Debug("check failed",
zap.Error(err),
)
continue
}
}
func (s *Schedule) Schedule() {
var ch chan *spider.Request
var req *spider.Request
for {
...
// 请求校验
if req != nil {
if err := req.Check(); err != nil {
zap.S().Debug("check failed",
zap.Error(err),
)
req = nil
ch = nil
continue
}
}
}
下面让我们来验证一下任务的删除功能是否正常,首先启动一个Master和一个Worker服务。
» go run main.go master --id=3 --http=:8082 --grpc=:9092 --pprof=:9982
» go run main.go worker --pprof=:9983
紧接着,调用Master的添加资源接口,可以看到爬虫任务是正常执行的。
» curl -H "content-type: application/json" -d '{"id":"zjx","name": "douban_book_list"}' <http://localhost:8082/crawler/resource> jackson@localhost
{"id":"go.micro.server.worker-1", "Address":"192.168.0.105:9090"}
然后,我们调用Master的删除资源接口,从Worker中的日志可以看到,Worker监听到了删除资源的事件。在日志打印出 "task has Closed"
的错误信息之后,删除的爬虫任务将不再运行。
{"level":"INFO","ts":"2022-12-31T14:06:33.528+0800","caller":"engine/schedule.go:479","msg":"receive delete resource","spec":{"ID":"1609068436011356160","Name":"douban_book_list","AssignedNode":"go.micro.server.worker-1|192.168.0.105:9090","CreationTime":1672466784878532000}
{"level":"DEBUG","ts":"2022-12-31T14:06:33.845+0800","caller":"engine/schedule.go:336","msg":"check failed","error":"task has Closed"}
{"level":"DEBUG","ts":"2022-12-31T14:06:33.845+0800","caller":"engine/schedule.go:269","msg":"check failed","error":"task has Closed"}
此后,当我们再次调用Master的添加资源接口时,爬虫任务又将恢复如初。删除功能验证成功。
总结
好了,这节课,我们设计了将Master请求转发到Leader的功能,让所有的Master都具备了接收请求的能力。此外,我们还使用了原生的互斥锁解决了并发安全问题。因为通道并不总是解决并发安全问题的最佳方式,在这里如果我们使用通道会减慢程序的并发性,使代码变得不优雅。
最后,我们还实现了在Worker集群模式下任务的加载与监听。在初始化时,我们通过加载etcd中属于当前节点的资源获取了全量的爬虫任务。我们还启动了对etcd资源的监听,实现了资源的动态添加和删除。至此,Master与Worker的核心功能与交互都已经能够正常工作了。
课后题
最后,还是给你留一道思考题。
在我们的设计中,默认一个爬虫任务是不能够被添加多次的。那有没有一种场景,可以让同一个爬虫任务添加多次,也就是让多个Worker可以同时执行同一个爬虫任务呢? 如果有这样的场景,我们应该如何修改设计?
欢迎你在留言区与我交流讨论,我们下节课再见!
- Geek_crazydaddy 👍(0) 💬(2)
watchResource里获取和删除任务时为啥都不判断任务是不是分配给当前worker了?
2023-02-20 - Realm 👍(0) 💬(0)
follow节点在收到资源变更请求,当请求到达grpc服务层时,通过注入进来的master grpc client,向master发起请求,参数不变,实现了转发功能,这个设计很赞!👍
2023-02-07