| 
 | 
	
 
1. ps-lite 
 
ps-lite是参数服务器(ps)的一种轻量实现,用于构建高可用分布式的机器学习应用。通常是多个节点运行在多台物理机器上用于处理机器学习问题,一般包含一个schedule节点和若干个worker/server节点。 
ps-lite的三种节点: 
 
- Worker:负责主要的计算工作,如读取数据,数据预处理,梯度计算等。它通过push和pull的方式和server节点进行通信。例如,worker节点push计算得到的梯度到server,或者从server节点pull最新的参数。
 
 - Server:用于维护和更新模型权重。每个server节点维护部分模型信息。
 
 - Scheduler:用于监听其他节点的存活状态。也负责发送控制命令(比如Server和Worker的连接通信),并且收集其它节点的工作进度。
 
  ps-lite 支持同步和异步两种机制 
 
 
  
 
- 在同步的机制下,系统运行的时间是由最慢的worker节点与通信时间决定的
 
 - 在异步的机制下,每个worker不用等待其它workers完成再运行下一次迭代。这样可以提高效率,但从迭代次数的角度来看,会减慢收敛的速度。
 
  ps-lite常应用于推荐系统中的ctr预测中,用以解决数据量较大,特征较多的问题,同时也有很多开发者贡献自己的应用源码。本文就结合基于ps-lite实现的分布式lr为例,记录一下ps-lite的具体使用,代码放在了: 
在 https://github.com/xswang/xflow 的基础上修改的[1] 2. 代码实例 
 
接下来就以分布式LR为例,简单介绍一下代码的各部分功能: 
主函数: 
int main(int argc, char *argv[]) { 
  if (ps::IsScheduler()) { 
    std::cout << &#34;start scheduler&#34; << std::endl; 
  } 
  if (ps::IsServer()) { 
    std::cout << &#34;start server&#34; << std::endl; 
    xflow::Server* server = new xflow::Server(); 
  } 
  ps::Start(); 
  if (ps::IsWorker()) { 
    std::cout << &#34;start worker&#34; << std::endl; 
    int epochs = std::atoi(argv[4]); 
    if (*(argv[3]) == &#39;0&#39;) { 
      std::cout << &#34;start LR &#34; << std::endl; 
      xflow::LRWorker* lr_worker = new xflow::LRWorker(argv[1], argv[2]); 
      lr_worker->epochs = epochs; 
      lr_worker->train(); 
    } 
    if (*(argv[3]) == &#39;1&#39;) { 
      std::cout << &#34;start FM &#34; << std::endl; 
      xflow::FMWorker* fm_worker = new xflow::FMWorker(argv[1], argv[2]); 
      fm_worker->epochs = epochs; 
      fm_worker->train(); 
    } 
    if (*(argv[3]) == &#39;2&#39;) { 
      std::cout<< &#34;start MVM &#34; << std::endl; 
      xflow::MVMWorker* mvm_worker = new xflow::MVMWorker(argv[1], argv[2]); 
      mvm_worker->epochs = epochs; 
      mvm_worker->train(); 
    } 
  } 
  ps::Finalize(); 
} 
 
- 对于 Scheduler 节点,直接调用ps-lite的Start()和Finalize()启动和准备回收;
 
 - 对于 Server 节点,可以定义一个单独的Server类;
 
 - 对于 Worker 节点,对于不同的模型方法定义对应的处理类,例如:xflow::LRWorker;
 
  2.1 Scheduler 节点 
对于scheduler节点进行初始化,Scheduler机器对应的IP地址设定为$DMLC_PS_ROOT_URI 
2.2 Server 节点 
Server类的定义: 
namespace xflow { 
class Server { 
 public: 
  Server() { 
    server_w_ = new ps::KVServer<float>(0); 
    server_w_->set_request_handle(SGD::KVServerSGDHandle_w()); 
    //server_w_->set_request_handle(FTRL::KVServerFTRLHandle_w()); 
 
    server_v_ = new ps::KVServer<float>(1); 
    //server_v_->set_request_handle(FTRL::KVServerFTRLHandle_v()); 
    server_v_->set_request_handle(SGD::KVServerSGDHandle_v()); 
    std::cout << &#34;init server success &#34; << std::endl; 
  } 
  ~Server() {} 
  ps::KVServer<float>* server_w_; 
  ps::KVServer<float>* server_v_; 
}; 
}  // namespace xflow 
#endif  // SRC_MODEL_SERVER_H_ 
 
其中定义了两个KVServer类型的权值: server_w_、server_v_对应模型的w和b,然后将在request handler中定义优化策略,这部分是server的核心处理逻辑,以SGD为例: 
typedef struct SGDEntry_w { 
    SGDEntry_w(int k = w_dim) { 
      w.resize(k, 0.0); 
    } 
    std::vector<float> w; 
  } sgdentry_w; 
 
  struct KVServerSGDHandle_w { 
    void operator()(const ps::KVMeta& req_meta, 
        const ps::KVPairs<float>& req_data, 
        ps::KVServer<float>* server) { 
      size_t keys_size = req_data.keys.size(); 
      size_t vals_size = req_data.vals.size(); 
      ps::KVPairs<float> res; 
 
      if (req_meta.push) { 
        w_dim = vals_size / keys_size; 
        CHECK_EQ(keys_size, vals_size / w_dim); 
      } else { 
        res.keys = req_data.keys; 
        res.vals.resize(keys_size * w_dim); 
      } 
 
      for (size_t i = 0; i < keys_size; ++i) { 
        ps::Key key = req_data.keys; 
        SGDEntry_w& val = store[key]; 
        for (int j = 0; j < w_dim; ++j) { 
          if (req_meta.push) { 
            float g = req_data.vals[i * w_dim + j]; 
            val.w[j] -= learning_rate * g; 
          } else { 
            for (int j = 0; j < w_dim; ++j) { 
              res.vals[i * w_dim + j] = val.w[j]; 
            } 
          } 
        } 
      } 
      server->Response(req_meta, res); 
    } 
 
   private: 
    std::unordered_map<ps::Key, sgdentry_w> store; 
  }; 
 
- 对于push操作,接收来自Worker计算的梯度,然后记录到维护的map里
 
 - 对于pull操作,按照key将上一轮更新的weight拿出来返回给Worker
 
 - 具体的优化算法,同步或者异步都可以在这里实现,上述实现的是简单的异步SGD
 
  2.3 Worker 节点 
Worker类的构造函数: 
  LRWorker(const char *train_file, 
    const char *test_file) : 
                           train_file_path(train_file), 
                           test_file_path(test_file) { 
    kv_w_ = new ps::KVWorker<float>(0); 
    base_ = new Base; 
    core_num = std::thread::hardware_concurrency(); 
    pool_ = new ThreadPool(core_num); 
  } 
此处定义了一个线程池,利用多线程来计算梯度,代码中也包含了线程池的具体实现。 
读取数据之后,进行训练: 
while (1) { 
        train_data_loader.load_minibatch_hash_data_fread(); 
        if (train_data->fea_matrix.size() <= 0) break; 
        int thread_size = train_data->fea_matrix.size() / core_num; 
        gradient_thread_finish_num = core_num; 
        for (int i = 0; i < core_num; ++i) { 
          int start = i * thread_size; 
          int end = (i + 1)* thread_size; 
          pool->enqueue(std::bind(&LRWorker::update, this, start, end)); 
        } 
        while (gradient_thread_finish_num > 0) { 
          usleep(5); 
        } 
        ++block; 
      } 
将数据分块,每个线程处理一份数据,调用LRWorker::update函数对参数进行更新 
void LRWorker::update(int start, int end) { 
    size_t idx = 0; 
    auto all_keys = std::vector<Base::sample_key>(); 
    auto unique_keys = std::vector<ps::Key>();; 
    int line_num = 0; 
    for (int row = start; row < end; ++row) { 
      int sample_size = train_data->fea_matrix[row].size(); 
      Base::sample_key sk; 
      sk.sid = line_num; 
      for (int j = 0; j < sample_size; ++j) { 
        idx = train_data->fea_matrix[row][j].fid; 
        sk.fid = idx; 
        all_keys.push_back(sk); 
        unique_keys.push_back(idx); 
      } 
      ++line_num; 
    } 
    std::sort(all_keys.begin(), all_keys.end(), base_->sort_finder); 
    std::sort(unique_keys.begin(), unique_keys.end()); 
    unique_keys.erase(unique(unique_keys.begin(), unique_keys.end()), 
                        unique_keys.end()); 
    int keys_size = (unique_keys).size(); 
 
    auto w = std::vector<float>(keys_size); 
    auto push_gradient = std::vector<float>(keys_size); 
    kv_w_->Wait(kv_w_->Pull(unique_keys, &(w))); 
    auto loss = std::vector<float>(end - start); 
    calculate_loss(w, all_keys, unique_keys, start, end, loss); 
    calculate_gradient(all_keys, unique_keys, loss, push_gradient); 
 
    kv_w_->Wait(kv_w_->Push(unique_keys, push_gradient)); 
    --gradient_thread_finish_num; 
  } 
这部分是Worker的核心处理逻辑,包含以下几个方面: 
 
- 结合数据组织的Key(ps::Key),去掉没有用到的特征;
 
 - 根据参数对应的Key从Server那里Pull取最新的参数;
 
 - 用最新的参数计算loss和gradient,code较长,感兴趣的可以到github了解
 
 - 计算得到的gradient,然后Push到Server中,进行参数的更新
 
  直到Worker的迭代结束,Server和Scheduler也相继关闭链接,调用ps::Finalize() 进行回收。 
简单的分布式LR就这样实现了。 
参考 
 
 
- ^xflow https://github.com/xswang/xflow
 
 
  |   
 
 
 
 |