In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/03 Report--
In the era of big data, stand-alone-based modeling is difficult to meet the growing data-level needs of enterprises, developers need to use distributed development methods to model on the cluster. However, there are some differences between stand-alone and distributed development code. This paper will introduce two ways of distributed development based on TensorFlow to help developers better choose the development direction of the module in the process of practice.
Native distributed development based on TensorFlow
Distributed development will involve the way of update gradient, there are synchronous and asynchronous two solutions, synchronous update can converge faster in the performance of the model, while asynchronous update, the speed of iteration will be faster. The diagrams of the two update methods are as follows:
Synchronous update process
(photo source: TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)
Asynchronous update process
(photo source: TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)
TensorFlow is a distributed development based on ps and work servers. The ps server can only be used for the summary update of the parameters, allowing each work to calculate the gradient.
The specific process of distributed development based on TensorFlow native is as follows:
First, specify the ps server startup parameter-job_name=ps:
Python distribute.py-ps_hosts=192.168.100.42:2222-worker_hosts=192.168.100.42:2224192.168.100.253:2225-job_name=ps-task_index=0
Then specify the work server parameters (start two work nodes)-job_name=work2:
Python distribute.py-ps_hosts=192.168.100.42:2222-worker_hosts=192.168.100.42:2224192.168.100.253:2225-job_name=worker-task_index=0python distribute.py-ps_hosts=192.168.100.42:2222-worker_hosts=192.168.100.42:2224192.168.100.253:2225-job_name=worker-task_index=1
After that, the parameter worker_hosts ps_hosts job_name task_index specified above needs to be accepted and used in the py file:
Tf.app.flags.DEFINE_string ("worker_hosts", "default", "description")
After receiving the parameters, you need to register ps and work respectively to make them perform their respective duties:
Ps_hosts = FLAGS.ps_hosts.split (",") worker_hosts = FLAGS.worker_hosts.split (",") cluster = tf.train.ClusterSpec ({"ps": ps_hosts, "worker": worker_hosts}) server = tf.train.Server (cluster,job_name=FLAGS.job_name Task_index=FLAGS.task_index) issync = FLAGS.issyncif FLAGS.job_name = = "ps": server.join () elif FLAGS.job_name = = "worker": with tf.device (tf.train.replica_device_setter (worker_device= "/ job:worker/task:%d"% FLAGS.task_index, cluster=cluster)):
Then update the gradient.
(1) synchronous update gradient:
Rep_op = tf.train.SyncReplicasOptimizer (optimizer, replicas_to_aggregate=len (worker_hosts), replica_id=FLAGS.task_index, total_num_replicas=len (worker_hosts)) Use_locking=True) train_op = rep_op.apply_gradients (grads_and_vars,global_step=global_step) init_token_op = rep_op.get_init_tokens_op () chief_queue_runner = rep_op.get_chief_queue_runner ()
(2) Asynchronous update gradient:
Train_op = optimizer.apply_gradients (grads_and_vars,global_step=global_step)
Finally, use tf.train.Supervisor for real iterations
In addition, developers should also note that if it is a synchronous update gradient, you need to add the following code:
Sv.start_queue_runners (sess, [chief_queue_runner]) sess.run (init_token_op)
It should be noted that the above asynchronous method requires you to specify the cluster IP and port. However, developers can also use TensorFlowOnSpark and Yarn for management.
Distributed Development based on TensorFlowOnSpark
As a mobile APP data statistics and analysis product for developer service, the user behavior prediction function module of the number is based on the distributed TensorFlowOnSpark. The distributed development based on TensorFlowOnSpark can not only shield the port and machine IP, but also achieve better resource application and allocation. In the case of multiple tens of millions of applications modeling at the same time, the cluster also has a good performance, and the corresponding resources and processes can also be seen in sparkUI. Most importantly, TensorFlowOnSpark can make the code easy to modify and easy to deploy when the stand-alone machine is over-distributed.
The specific process of distributed development based on TensorFlowOnSpark is as follows:
First, you need to use spark-submit to submit the task, and specify the parameters that spark needs to run (- num-executors 6, etc.), model code, model overparameters, and so on. You also need to accept external parameters:
Parser = argparse.ArgumentParser () parser.add_argument ("- I", "--tracks", help= "dataset path") args = parser.parse_args ()
After that, prepare the parameters and training data (DataFrame), and call the API of the model to start.
Among them, soft_dist.map_fun is the method to be adjusted, followed by the parameters of model training.
Estimator = TFEstimator (soft_dist.map_fun, args)\ .setInputMapping ({'tracks':' tracks' 'label':' label'})\ .setModelDir (args.model)\ .setExportDir (args.serving)\ .setClusterSize (args.cluster_size)\ .setNumPS (num_ps)\ .setEpochs (args.epochs)\ .setBatchSize (args.batch_size)\ .sets teps (args.max_steps) model = estimator.fit (df)
Next, soft_dist defines a method for map_fun (args, ctx):
Def map_fun (args, ctx):... worker_num = ctx.worker_num # worker quantity job_name = ctx.job_name # job name task_index = ctx.task_index # Task Index if job_name = "ps": # ps Node (Master) time.sleep ((worker_num + 1) * 5) cluster, server = TFNode.start_cluster_server (ctx, 1) Args.rdma) num_workers = len (cluster.as_dict () ['worker']) if job_name = = "ps": server.join () elif job_name = = "worker": with tf.device (tf.train.replica_device_setter (worker_device= "/ job:worker/task:%d"% task_index, cluster=cluster)):
After that, you can use tf.train.MonitoredTrainingSession advanced API for model training and prediction.
Summary
TensorFlow-based distributed development is roughly the two cases introduced in this article, the second way can be used in the actual production environment, the stability will be higher.
At the end of the run, developers can also keep abreast of the running of the model by setting email notifications.
At the same time, if developers use SessionRunHook to save the final output model, they also need to understand that a BUG in the framework code, that is, it can only be saved within a specified time, beyond the specified time, even if the run is not finished, the program will be forced to end. If the developer is using an unfixed version of BUG, you should handle it yourself and relax the running time.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.