香雨站

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 115|回复: 1

【分布式】基于ps-lite的分布式计算实例解析

[复制链接]

3

主题

4

帖子

10

积分

新手上路

Rank: 1

积分
10
发表于 2023-4-9 16:43:59 | 显示全部楼层 |阅读模式
1. ps-lite

ps-lite是参数服务器(ps)的一种轻量实现,用于构建高可用分布式的机器学习应用。通常是多个节点运行在多台物理机器上用于处理机器学习问题,一般包含一个schedule节点和若干个worker/server节点。
ps-lite的三种节点:

  • Worker:负责主要的计算工作,如读取数据,数据预处理,梯度计算等。它通过push和pull的方式和server节点进行通信。例如,worker节点push计算得到的梯度到server,或者从server节点pull最新的参数。
  • Server:用于维护和更新模型权重。每个server节点维护部分模型信息。
  • Scheduler:用于监听其他节点的存活状态。也负责发送控制命令(比如Server和Worker的连接通信),并且收集其它节点的工作进度。
ps-lite 支持同步和异步两种机制




  • 在同步的机制下,系统运行的时间是由最慢的worker节点与通信时间决定的
  • 在异步的机制下,每个worker不用等待其它workers完成再运行下一次迭代。这样可以提高效率,但从迭代次数的角度来看,会减慢收敛的速度。
ps-lite常应用于推荐系统中的ctr预测中,用以解决数据量较大,特征较多的问题,同时也有很多开发者贡献自己的应用源码。本文就结合基于ps-lite实现的分布式lr为例,记录一下ps-lite的具体使用,代码放在了:
在 https://github.com/xswang/xflow 的基础上修改的[1]
2. 代码实例

接下来就以分布式LR为例,简单介绍一下代码的各部分功能:
主函数:
int main(int argc, char *argv[]) {
  if (ps::IsScheduler()) {
    std::cout << "start scheduler" << std::endl;
  }
  if (ps::IsServer()) {
    std::cout << "start server" << std::endl;
    xflow::Server* server = new xflow::Server();
  }
  ps::Start();
  if (ps::IsWorker()) {
    std::cout << "start worker" << std::endl;
    int epochs = std::atoi(argv[4]);
    if (*(argv[3]) == '0') {
      std::cout << "start LR " << std::endl;
      xflow::LRWorker* lr_worker = new xflow::LRWorker(argv[1], argv[2]);
      lr_worker->epochs = epochs;
      lr_worker->train();
    }
    if (*(argv[3]) == '1') {
      std::cout << "start FM " << std::endl;
      xflow::FMWorker* fm_worker = new xflow::FMWorker(argv[1], argv[2]);
      fm_worker->epochs = epochs;
      fm_worker->train();
    }
    if (*(argv[3]) == '2') {
      std::cout<< "start MVM " << std::endl;
      xflow::MVMWorker* mvm_worker = new xflow::MVMWorker(argv[1], argv[2]);
      mvm_worker->epochs = epochs;
      mvm_worker->train();
    }
  }
  ps::Finalize();
}

  • 对于 Scheduler 节点,直接调用ps-lite的Start()和Finalize()启动和准备回收;
  • 对于 Server 节点,可以定义一个单独的Server类;
  • 对于 Worker 节点,对于不同的模型方法定义对应的处理类,例如:xflow::LRWorker;
2.1 Scheduler 节点
对于scheduler节点进行初始化,Scheduler机器对应的IP地址设定为$DMLC_PS_ROOT_URI
2.2 Server 节点
Server类的定义:
namespace xflow {
class Server {
public:
  Server() {
    server_w_ = new ps::KVServer<float>(0);
    server_w_->set_request_handle(SGD::KVServerSGDHandle_w());
    //server_w_->set_request_handle(FTRL::KVServerFTRLHandle_w());

    server_v_ = new ps::KVServer<float>(1);
    //server_v_->set_request_handle(FTRL::KVServerFTRLHandle_v());
    server_v_->set_request_handle(SGD::KVServerSGDHandle_v());
    std::cout << "init server success " << std::endl;
  }
  ~Server() {}
  ps::KVServer<float>* server_w_;
  ps::KVServer<float>* server_v_;
};
}  // namespace xflow
#endif  // SRC_MODEL_SERVER_H_

其中定义了两个KVServer类型的权值: server_w_、server_v_对应模型的w和b,然后将在request handler中定义优化策略,这部分是server的核心处理逻辑,以SGD为例:
typedef struct SGDEntry_w {
    SGDEntry_w(int k = w_dim) {
      w.resize(k, 0.0);
    }
    std::vector<float> w;
  } sgdentry_w;

  struct KVServerSGDHandle_w {
    void operator()(const ps::KVMeta& req_meta,
        const ps::KVPairs<float>& req_data,
        ps::KVServer<float>* server) {
      size_t keys_size = req_data.keys.size();
      size_t vals_size = req_data.vals.size();
      ps::KVPairs<float> res;

      if (req_meta.push) {
        w_dim = vals_size / keys_size;
        CHECK_EQ(keys_size, vals_size / w_dim);
      } else {
        res.keys = req_data.keys;
        res.vals.resize(keys_size * w_dim);
      }

      for (size_t i = 0; i < keys_size; ++i) {
        ps::Key key = req_data.keys;
        SGDEntry_w& val = store[key];
        for (int j = 0; j < w_dim; ++j) {
          if (req_meta.push) {
            float g = req_data.vals[i * w_dim + j];
            val.w[j] -= learning_rate * g;
          } else {
            for (int j = 0; j < w_dim; ++j) {
              res.vals[i * w_dim + j] = val.w[j];
            }
          }
        }
      }
      server->Response(req_meta, res);
    }

   private:
    std::unordered_map<ps::Key, sgdentry_w> store;
  };

  • 对于push操作,接收来自Worker计算的梯度,然后记录到维护的map里
  • 对于pull操作,按照key将上一轮更新的weight拿出来返回给Worker
  • 具体的优化算法,同步或者异步都可以在这里实现,上述实现的是简单的异步SGD
2.3 Worker 节点
Worker类的构造函数:
  LRWorker(const char *train_file,
    const char *test_file) :
                           train_file_path(train_file),
                           test_file_path(test_file) {
    kv_w_ = new ps::KVWorker<float>(0);
    base_ = new Base;
    core_num = std::thread::hardware_concurrency();
    pool_ = new ThreadPool(core_num);
  }
此处定义了一个线程池,利用多线程来计算梯度,代码中也包含了线程池的具体实现。
读取数据之后,进行训练:
while (1) {
        train_data_loader.load_minibatch_hash_data_fread();
        if (train_data->fea_matrix.size() <= 0) break;
        int thread_size = train_data->fea_matrix.size() / core_num;
        gradient_thread_finish_num = core_num;
        for (int i = 0; i < core_num; ++i) {
          int start = i * thread_size;
          int end = (i + 1)* thread_size;
          pool->enqueue(std::bind(&LRWorker::update, this, start, end));
        }
        while (gradient_thread_finish_num > 0) {
          usleep(5);
        }
        ++block;
      }
将数据分块,每个线程处理一份数据,调用LRWorker::update函数对参数进行更新
void LRWorker::update(int start, int end) {
    size_t idx = 0;
    auto all_keys = std::vector<Base::sample_key>();
    auto unique_keys = std::vector<ps::Key>();;
    int line_num = 0;
    for (int row = start; row < end; ++row) {
      int sample_size = train_data->fea_matrix[row].size();
      Base::sample_key sk;
      sk.sid = line_num;
      for (int j = 0; j < sample_size; ++j) {
        idx = train_data->fea_matrix[row][j].fid;
        sk.fid = idx;
        all_keys.push_back(sk);
        unique_keys.push_back(idx);
      }
      ++line_num;
    }
    std::sort(all_keys.begin(), all_keys.end(), base_->sort_finder);
    std::sort(unique_keys.begin(), unique_keys.end());
    unique_keys.erase(unique(unique_keys.begin(), unique_keys.end()),
                        unique_keys.end());
    int keys_size = (unique_keys).size();

    auto w = std::vector<float>(keys_size);
    auto push_gradient = std::vector<float>(keys_size);
    kv_w_->Wait(kv_w_->Pull(unique_keys, &(w)));
    auto loss = std::vector<float>(end - start);
    calculate_loss(w, all_keys, unique_keys, start, end, loss);
    calculate_gradient(all_keys, unique_keys, loss, push_gradient);

    kv_w_->Wait(kv_w_->Push(unique_keys, push_gradient));
    --gradient_thread_finish_num;
  }
这部分是Worker的核心处理逻辑,包含以下几个方面:

  • 结合数据组织的Key(ps::Key),去掉没有用到的特征;
  • 根据参数对应的Key从Server那里Pull取最新的参数;
  • 用最新的参数计算loss和gradient,code较长,感兴趣的可以到github了解
  • 计算得到的gradient,然后Push到Server中,进行参数的更新
直到Worker的迭代结束,Server和Scheduler也相继关闭链接,调用ps::Finalize() 进行回收。
简单的分布式LR就这样实现了。
参考


  • ^xflow https://github.com/xswang/xflow
回复

使用道具 举报

3

主题

8

帖子

14

积分

新手上路

Rank: 1

积分
14
发表于 2023-4-9 16:44:31 | 显示全部楼层
想问下worker在初始化时也是pull部分参数吗?代码里哪里能够证实?Thx
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|香雨站

GMT+8, 2025-7-3 15:04 , Processed in 0.086044 second(s), 22 queries .

Powered by Discuz! X3.4

© 2001-2013 Comsenz Inc.. 技术支持 by 巅峰设计

快速回复 返回顶部 返回列表