Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Mars Remote API to execute Python function

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to use Mars Remote API to execute Python function", the content of the article is simple and clear, easy to learn and understand, now please follow the editor's train of thought slowly in depth, together to study and learn "how to use Mars Remote API to execute Python function" bar!

Mars is a parallel and distributed Python framework, which can easily use multi-core or multi-machine acceleration of well-known libraries such as numpy, pandas, scikit-learn, and Python functions. Among them, parallel and distributed Python functions mainly use Mars Remote API.

For more information to start a Mars distributed environment, please see:

The command line is deployed in the cluster.

Deployed in Kubernetes.

MaxCompute out-of-the-box environment, the purchase of MaxCompute services can be used directly.

How to use Mars Remote API

Using Mars Remote API is very simple and can be executed distributed with only a few changes to the original code.

Taking the Monte Carlo method to calculate π as an example. The code is as follows, we wrote two functions, calc_chunk is used to calculate the number of points falling in the circle in each slice, and calc_pi is used to summarize the results of multiple calc_chunk calculations and finally get the π value.

From typing import Listimport numpy as npdef calc_chunk (n: int, I: int): # calculate the number of n random points (the x and y axes fall between-1 and 1) and the distance from the origin rs = np.random.RandomState (I) a = rs.uniform (- 1,1, size= (n, 2)) d = np.linalg.norm (a, axis=1) return (d)

< 1).sum()def calc_pi(fs: List[int], N: int): # 将若干次 calc_chunk 计算的结果汇总,计算 pi 的值 return sum(fs) * 4 / NN = 200_000_000n = 10_000_000fs = [calc_chunk(n, i) for i in range(N // n)]pi = calc_pi(fs, N)print(pi) %%time 下可以看到结果: 3.1416312CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 sWall time: 12.3 s 在单机需要 12.3 s。 要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。 import mars.remote as mr# 函数调用改成 mars.remote.spawnfs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)]# 把 spawn 的列表传入作为参数,再 spawn 新的函数pi = mr.spawn(calc_pi, args=(fs, N))# 通过 execute() 触发执行,fetch() 获取结果print(pi.execute().fetch()) %%time 下看到结果: 3.1416312CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 msWall time: 2.85 s 结果一模一样,但是却有数倍的性能提升。 可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。 一个例子 为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。 困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。 准备数据 这个例子里我们使用 otto 数据集。 首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。 import pandas as pdfrom sklearn.preprocessing import LabelEncoderfrom sklearn.model_selection import train_test_splitdef gen_data(): df = pd.read_csv('otto/train.csv') X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)X_train, X_test, y_train, y_test = gen_data() 模型 接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。 RandomForest: from sklearn.ensemble import RandomForestClassifierdef random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = RandomForestClassifier(verbose=verbose, **kw) model.fit(X_train, y_train) return model 接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。 def gen_random_forest_parameters(): for n_estimators in [50, 100, 600]: for max_depth in [None, 3, 15]: for criterion in ['gini', 'entropy']: yield { 'n_estimators': n_estimators, 'max_depth': max_depth, 'criterion': criterion } LogisticRegression 也是这个过程。我们先定义模型。 from sklearn.linear_model import LogisticRegressiondef logistic_regression(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = LogisticRegression(verbose=verbose, **kw) model.fit(X_train, y_train) return model 接着生成供 LogisticRegression 使用的超参。 def gen_lr_parameters(): for penalty in ['l2', 'none']: for tol in [0.1, 0.01, 1e-4]: yield { 'penalty': penalty, 'tol': tol } XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。 from xgboost import XGBClassifierdef xgb(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = XGBClassifier(verbosity=int(verbose), **kw) model.fit(X_train, y_train) return model 生成一系列超参。 def gen_xgb_parameters(): for n_estimators in [100, 600]: for criterion in ['gini', 'entropy']: for learning_rate in [0.001, 0.1, 0.5]: yield { 'n_estimators': n_estimators, 'criterion': criterion, 'learning_rate': learning_rate } 验证 接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。 from sklearn.metrics import log_lossdef metric_model(model, X_test: pd.DataFrame, y_test: pd.Series) ->

Float: if isinstance (model, bytes): model = pickle.loads (model) y_pred = model.predict_proba (X_test) return log_loss (y_test, y_pred) def train_and_metric (train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False): # package training and verification together model = train_func (X_train, y_train, verbose=verbose, * * train_params) metric = metric_model (model, X_test, y_test) return model Metric finds the best model.

After we had made the preparations, we began to run the model. For each model, we send the hyperparameters generated each time to train. in addition to these hyperparameters, we also set the n_jobs to-1, so that we can make better use of the multi-core of the single machine.

Results = [] #-# Random Forest#-for params in gen_random_forest_parameters (): print (f'calculating on {params}') # fixed random_state params ['random_state'] = 123 # use all CPU cores params [' nasty Jobs'] =-1 model, metric = train_and_metric (random_forest, params) X_train, y_train, X_test, y_test) print (f'metric: {metric}') results.append ({'model': model) 'metric': metric}) #-# Logistic Regression#-for params in gen_lr_parameters (): print (f'calculating on {params}') # fixed random_state params ['random_state'] = 123 # use all CPU cores params [' nasty Jobs'] =-1 model Metric = train_and_metric (logistic_regression, params, X_train, y_train, X_test, y_test) print (f'metric: {metric}') results.append ({'model': model 'metric': metric}) #-# XGBoost#-for params in gen_xgb_parameters (): print (f'calculating on {params}') # fixed random_state params ['random_state'] = 123 # use all CPU cores params [' nasty Jobs'] =-1 model, metric = train_and_metric (xgb, params) X_train, y_train, X_test, y_test) print (f'metric: {metric}') results.append ({'model': model,' metric': metric})

It takes quite a long time to run, and we omit part of the output.

Calculating on {'max_depth': None,' criterion': 'gini'} metric: 0.6964123781828575calculating on {' max_depth': None, 'criterion':' entropy'} metric: 0.691231279083228 omit the output of other models CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28sWall time: 31min 44s

From the CPU time and Wall time, we can see that these training still make full use of multi-core performance. But the whole process took 31 minutes.

Using Remote API distributed acceleration

Now let's try to speed up the whole process in a distributed way using Remote API.

In terms of clustering, we use the third method mentioned at the beginning to pull up a cluster directly on the MaxCompute. You can choose other ways, and the effect is the same.

N_cores = 8mem = 2 * n_cores # 16G# o is the MaxCompute entry, where 10 worker clusters are created, each worker 8 core 16Gcluster = o.create_mars_cluster (10, n_cores, mem, image='extended')

In order to facilitate reading data in a distributed way, we make minor changes to the data processing to upload the data to the MaxCompute resource. For other environments, users can consider storage such as HDFS, Aliyun OSS, or Amazon S3.

If not o.exist_resource ('otto_train.csv'): with open (' otto/train.csv') as f: # upload resource o.create_resource ('otto_train.csv',' file', fileobj=f) def gen_data (): # change to read from resource df = pd.read_csv (o.open_resource ('otto_train.csv')) X = df.drop ([' target') 'id'], axis=1) y = df [' target'] label_encoder = LabelEncoder () label_encoder.fit (y) y = label_encoder.transform (y) return train_test_split (X, y, test_size=0.33, random_state=123)

With a few changes, we use the mars.remote.spawn method to schedule gen_data to run on the cluster.

Import mars.remote as mr# n_output description is 4 output # execute () after execution, the data will be read into the Mars cluster data = mr.ExecutableTuple (mr.spawn (gen_data, n_output=4)). Execute () # remote_ begins with Mars objects. In this case, the data is in the cluster, and these objects only refer to remote_X_train, remote_X_test, remote_y_train, remote_y_test = data.

At present, Mars can correctly serialize numpy ndarray, pandas DataFrame, etc., but can not serialize the model yet, so we have to make a slight change to train_and_metric to pickle the model before returning.

Def distributed_train_and_metric (train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame Y_test: pd.Series, verbose: bool = False): model, metric = train_and_metric (train_func, train_params, X_train, y_train X_test, y_test, verbose=verbose) return pickle.dumps (model), metric

After the subsequent Mars supports the serialization model, you can directly spawn the original function.

Then we will make a slight change to the previous execution process to rewrite all the function calls with mars.remote.spawn.

Import numpy as nptasks = [] models = [] metrics = [] #-# Random Forest#-for params in gen_random_forest_parameters (): # fixed random_state params ['random_state'] = 123 task = mr.spawn (distributed_train_and_metric, args= (random_forest, params, remote_X_train, remote_y_train) Remote_X_test, remote_y_test), kwargs= {'verbose': 2} N_output=2) tasks.extend (task) # Storage models.append (task [0]) metrics.append (task [1]) #-# Logistic Regression#-for params in gen_lr_parameters (): # fixed random_ State params ['random_state'] = 123 task = mr.spawn (distributed_train_and_metric Args= (logistic_regression, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs= {'verbose': 2} N_output=2) tasks.extend (task) # store models.append (task [0]) metrics.append (task [1]) #-# XGBoost#-for params in gen_xgb_parameters (): # fixed random_state params ['random_state'] = 123 # and then specify the number of concurrent cores Params ['nasty Jobs'] = n_cores task = mr.spawn (distributed_train_and_metric Args= (xgb, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs= {'verbose': 2}, n_output=2) tasks.extend (task) # store the model and evaluation respectively models.append (task [0]) metrics.append (task [1]) # disrupt the order The aim is to disperse to an average point of worker shuffled_tasks = np.random.permutation (tasks) _ = mr.ExecutableTuple (shuffled_tasks). Execute ()

You can see that the code is almost identical.

Run to view the results:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 msWall time: 1min 59s

The time suddenly increased from 31 minutes to 2 minutes, an increase of 15x +. But the cost of code changes is negligible.

Careful readers may have noticed that in the distributed running code, we opened the verbose of the model. In the distributed environment, because these functions are executed remotely, the printed content will only be output to the standard output stream of worker. We will not see the printed results on the client side, but Mars provides a very useful interface for us to view the output of each model runtime.

Taking the 0th model as an example, we can call the fetch_log method directly on the Mars object.

Print (models [0] .fetch _ log ())

Output a brief part of us.

Building tree 1 of 50building tree 2 of 50building tree 3 of 50building tree 4 of 50building tree 5 of 50building tree 6 of 5 omitted building tree 49 of 50building tree 50 of 50

Any model you want can be done in this way. Imagine that without fetch_log API, you really want to see how troublesome the output of the intermediate process is. First of all, it is not known on which worker this function is executed; then, even if you know which worker, because there may be more than one function execution on each worker, the output may be mixed up or even flooded with large logs. The fetch_log interface allows users to worry about which worker to execute on and no need to worry about mixing logs.

Thank you for reading, the above is the content of "how to use Mars Remote API to execute Python function". After the study of this article, I believe you have a deeper understanding of how to use Mars Remote API to implement Python function, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report