Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Mars Remote API to execute Python function

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the knowledge of "how to use Mars Remote API to execute Python function". Many people will encounter such a dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Mars is a parallel and distributed Python framework, which can easily use multi-core or multi-machine acceleration of well-known libraries such as numpy, pandas, scikit-learn, and Python functions. Among them, parallel and distributed Python functions mainly use Mars Remote API.

For more information to start a Mars distributed environment, please see:

The command line is deployed in the cluster.

Deployed in Kubernetes.

MaxCompute out-of-the-box environment, the purchase of MaxCompute services can be used directly.

How to use Mars Remote API

Using Mars Remote API is very simple and can be executed distributed with only a few changes to the original code.

Take the Monte Carlo method for calculating π as an example. The code is as follows, we wrote two functions, calc_chunk is used to calculate the number of points falling in the circle in each slice, and calc_pi is used to summarize the results of multiple calc_chunk calculations and finally get the π value.

From typing import Listimport numpy as npdef calc_chunk (n: int, I: int): # calculate the number of n random points (the x and y axes fall between-1 and 1) and the distance from the origin rs = np.random.RandomState (I) a = rs.uniform (- 1,1, size= (n, 2)) d = np.linalg.norm (a, axis=1) return (d)

< 1).sum()def calc_pi(fs: List[int], N: int):# 将若干次 calc_chunk 计算的结果汇总,计算 pi 的值return sum(fs) * 4 / NN = 200_000_000n = 10_000_000fs = [calc_chunk(n, i) for i in range(N // n)]pi = calc_pi(fs, N)print(pi) %%time 下可以看到结果: 3.1416312CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 sWall time: 12.3 s 在单机需要 12.3 s。 要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。 import mars.remote as mr# 函数调用改成 mars.remote.spawnfs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)]# 把 spawn 的列表传入作为参数,再 spawn 新的函数pi = mr.spawn(calc_pi, args=(fs, N))# 通过 execute() 触发执行,fetch() 获取结果print(pi.execute().fetch()) %%time 下看到结果: 3.1416312CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 msWall time: 2.85 s 结果一模一样,但是却有数倍的性能提升。 可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。 一个例子 为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。 困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。 准备数据 这个例子里我们使用 otto 数据集。 首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。 import pandas as pdfrom sklearn.preprocessing import LabelEncoderfrom sklearn.model_selection import train_test_splitdef gen_data():df = pd.read_csv('otto/train.csv') X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)X_train, X_test, y_train, y_test = gen_data()模型 接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。 RandomForest: from sklearn.ensemble import RandomForestClassifierdef random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw):model = RandomForestClassifier(verbose=verbose, **kw) model.fit(X_train, y_train)return model 接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。 def gen_random_forest_parameters():for n_estimators in [50, 100, 600]:for max_depth in [None, 3, 15]:for criterion in ['gini', 'entropy']:yield {'n_estimators': n_estimators,'max_depth': max_depth,'criterion': criterion } LogisticRegression 也是这个过程。我们先定义模型。 from sklearn.linear_model import LogisticRegressiondef logistic_regression(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw):model = LogisticRegression(verbose=verbose, **kw) model.fit(X_train, y_train)return model 接着生成供 LogisticRegression 使用的超参。 def gen_lr_parameters():for penalty in ['l2', 'none']:for tol in [0.1, 0.01, 1e-4]:yield {'penalty': penalty,'tol': tol } XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。 from xgboost import XGBClassifierdef xgb(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw):model = XGBClassifier(verbosity=int(verbose), **kw) model.fit(X_train, y_train)return model 生成一系列超参。 def gen_xgb_parameters():for n_estimators in [100, 600]:for criterion in ['gini', 'entropy']:for learning_rate in [0.001, 0.1, 0.5]:yield {'n_estimators': n_estimators,'criterion': criterion,'learning_rate': learning_rate }验证 接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。 from sklearn.metrics import log_lossdef metric_model(model, X_test: pd.DataFrame, y_test: pd.Series) ->

Float:if isinstance (model, bytes): model = pickle.loads (model) y_pred = model.predict_proba (X_test) return log_loss (y_test, y_pred) def train_and_metric (train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False): # package training and verification together model = train_func (X_train, y_train, verbose=verbose, * * train_params) metric = metric_model (model, X_test, y_test) return model, metric find the best model

After we had made the preparations, we began to run the model. For each model, we send the hyperparameters generated each time to train. in addition to these hyperparameters, we also set the n_jobs to-1, so that we can make better use of the multi-core of the single machine.

Results = [] #-# Random Forest#-for params in gen_random_forest_parameters (): print (f'calculating on {params}') # fixed random_stateparams ['random_state'] = 12 years use all CPU coresparams [' nasty Jobs'] =-1model, metric = train_and_metric (random_forest, params, X_train) Y_train, X_test, y_test) print (f'metric: {metric}') results.append ({'model': model 'metric': metric}) #-# Logistic Regression#-for params in gen_lr_parameters (): print (f'calculating on {params}') # fixed random_stateparams ['random_state'] = 12 years use all CPU coresparams [' nasty Jobs'] =-1model, metric = train_and_metric (logistic_regression, params) X_train, y_train, X_test, y_test) print (f'metric: {metric}') results.append ({'model': model) 'metric': metric}) #-# XGBoost#-for params in gen_xgb_parameters (): print (f'calculating on {params}') # fixed random_stateparams ['random_state'] = 12 years use all CPU coresparams [' nasty Jobs'] =-1model, metric = train_and_metric (xgb, params, X_train, y_train X_test, y_test) print (f'metric: {metric}') results.append ({'model': model,' metric': metric})

It takes quite a long time to run, and we omit part of the output.

Calculating on {'max_depth': None,' criterion': 'gini'} metric: 0.6964123781828575calculating on {' max_depth': None, 'criterion':' entropy'} metric: 0.691231279083228 omit the output of other models CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28sWall time: 31min 44s

From the CPU time and Wall time, we can see that these training still make full use of multi-core performance. But the whole process took 31 minutes.

Using Remote API distributed acceleration

Now let's try to speed up the whole process in a distributed way using Remote API.

In terms of clustering, we use the third method mentioned at the beginning to pull up a cluster directly on the MaxCompute. You can choose other ways, and the effect is the same.

N_cores = 8mem = 2 * n_cores # 16G# o is the MaxCompute entry, where 10 worker clusters are created, each worker 8 core 16Gcluster = o.create_mars_cluster (10, n_cores, mem, image='extended')

In order to facilitate reading data in a distributed way, we make minor changes to the data processing to upload the data to the MaxCompute resource. For other environments, users can consider storage such as HDFS, Aliyun OSS, or Amazon S3.

If not o.exist_resource ('otto_train.csv'): with open (' otto/train.csv') as FRV # upload resource o.create_resource ('otto_train.csv',' file', fileobj=f) def gen_data (): # change it to read df = pd.read_csv (o.open_resource ('otto_train.csv')) X = df.drop ([' target', 'id']) from the resource Axis=1) y = df ['target'] label_encoder = LabelEncoder () label_encoder.fit (y) y = label_encoder.transform (y) return train_test_split (X, y, test_size=0.33, random_state=123)

With a few changes, we use the mars.remote.spawn method to schedule gen_data to run on the cluster.

Import mars.remote as mr# n_output description is 4 output # execute () after execution, the data will be read into the Mars cluster data = mr.ExecutableTuple (mr.spawn (gen_data, n_output=4)). Execute () # remote_ begins with Mars objects. In this case, the data is in the cluster, and these objects only refer to remote_X_train, remote_X_test, remote_y_train, remote_y_test = data.

At present, Mars can correctly serialize numpy ndarray, pandas DataFrame, etc., but can not serialize the model yet, so we have to make a slight change to train_and_metric to pickle the model before returning.

Def distributed_train_and_metric (train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame Y_test: pd.Series, verbose: bool = False): model, metric = train_and_metric (train_func, train_params, X_train, y_train, X_test Y_test, verbose=verbose) return pickle.dumps (model), metric

After the subsequent Mars supports the serialization model, you can directly spawn the original function.

Then we will make a slight change to the previous execution process to rewrite all the function calls with mars.remote.spawn.

Import numpy as nptasks = [] models = [] metrics = [] #-# Random Forest#-for params in gen_random_forest_parameters (): # fixed random_stateparams ['random_state'] = 123task = mr.spawn (distributed_train_and_metric, args= (random_forest, params, remote_X_train, remote_y_train, remote_X_test) Remote_y_test), kwargs= {'verbose': 2} N_output=2) tasks.extend (task) # Storage models.append (task [0]) metrics.append (task [1]) #-# Logistic Regression#-for params in gen_lr_parameters (): # fixed random_stateparams ['random_state'] = 123task = mr.spawn (distributed_train_and_metric Args= (logistic_regression, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs= {'verbose': 2} N_output=2) tasks.extend (task) # store models.append (task [0]) metrics.append (task [1]) #-# XGBoost#-for params in gen_xgb_parameters (): # fixed random_stateparams ['random_state'] = 12 cores and then specify the number of concurrent cores params [' random_state''] = n_cores task = mr.spawn (distributed_train_and_metric) Args= (xgb, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs= {'verbose': 2}, n_output=2) tasks.extend (task) # store the model and evaluation respectively models.append (task [0]) metrics.append (task [1]) # disrupt the order The aim is to disperse to an average point of worker shuffled_tasks = np.random.permutation (tasks) _ = mr.ExecutableTuple (shuffled_tasks). Execute ()

You can see that the code is almost identical.

Run to view the results:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 msWall time: 1min 59s

The time suddenly increased from 31 minutes to 2 minutes, an increase of 15x +. But the cost of code changes is negligible.

Careful readers may have noticed that in the distributed running code, we opened the verbose of the model. In the distributed environment, because these functions are executed remotely, the printed content will only be output to the standard output stream of worker. We will not see the printed results on the client side, but Mars provides a very useful interface for us to view the output of each model runtime.

Taking the 0th model as an example, we can call the fetch_log method directly on the Mars object.

Print (models [0] .fetch _ log ())

Output a brief part of us.

Building tree 1 of 50building tree 2 of 50building tree 3 of 50building tree 4 of 50building tree 5 of 50building tree 6 of 5 omitted building tree 49 of 50building tree 50 of 50

Any model you want can be done in this way. Imagine that without fetch_log API, you really want to see how troublesome the output of the intermediate process is. First of all, it is not known on which worker this function is executed; then, even if you know which worker, because there may be more than one function execution on each worker, the output may be mixed up or even flooded with large logs. The fetch_log interface allows users to worry about which worker to execute on and no need to worry about mixing logs.

To learn about the fetch_log interface, check the documentation.

That's all for "how to use Mars Remote API to execute Python functions". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report