Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The process of implementing WordCount algorithm based on Cloud function SCF and object Storage

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you the process of implementing the WordCount algorithm of cloud function SCF and object storage. The content is concise and easy to understand, which will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

We will try to implement a simple WordCount algorithm through the MapReduce model, which is different from the traditional use of big data frameworks such as Hadoop, using cloud function SCF and object storage COS.

MapReduce is explained in Wikipedia as follows:

MapReduce is a software architecture proposed by Google, which is used for parallel computing of large-scale data sets (larger than 1TB). The concepts "Map" and "Reduce", and their main ideas, are borrowed from functional programming languages, as well as features borrowed from vector programming languages.

Through this description, we know that MapReduce is a computing model, framework and platform for big data's parallel processing. In traditional learning, MapReduce-related work is usually carried out under distributed frameworks such as Hadoop. With the gradual development of cloud computing, various cloud vendors have launched online MapReduce services.

Theoretical basis

Before we begin, let's draw a simple flow chart according to the requirements of MapReduce:

In this structure, we need two cloud functions as Mapper and Reducer; and three object storage buckets as input buckets, intermediate temporary cache buckets and result buckets. Before the instance, since our function is about to be deployed in Guangzhou, we set up three buckets in Guangzhou:

Object Storage 1 ap-guangzhou srcmr object Storage 2 ap-guangzhou middlestagebucket object Storage 3 ap-guangzhou destcmr

To make the whole Mapper and Reducer logic clearer, modify the traditional WordCount structure to make it more suitable for cloud functions, and allocate the work of Mapper and Reducer reasonably at the same time:

Function realization

Write Mapper-related logic with the following code:

#-*-coding: utf8-*-import datetimefrom qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom qcloud_cos_v5 import CosServiceErrorimport reimport osimport sysimport logginglogging.basicConfig (level=logging.INFO, stream=sys.stdout) logger = logging.getLogger () logger.setLevel (level=logging.INFO) region = guangzhou' # modify the region middle_stage_bucket = 'middlestagebucket' # according to the actual situation Modify the bucket name def delete_file_folder (src): if os.path.isfile (src): try: os.remove (src) except: pass elif os.path.isdir (src): for item in os.listdir (src): itemsrc = os.path.join (src) Item) delete_file_folder (itemsrc) try: os.rmdir (src) except: passdef download_file (cos_client, bucket, key, download_path): logger.info ("Get from [% s] to download file [% s]"% (bucket, key)) try: response = cos_client.get_object (Bucket=bucket, Key=key ) response ['Body'] .get_stream_to_file (download_path) except CosServiceError as e: print (e.get_error_code ()) print (e.get_error_msg ()) return-1 return 0def upload_file (cos_client, bucket, key Local_file_path): logger.info ("Start to upload file to cos") try: response = cos_client.put_object_from_local_file (Bucket=bucket, LocalFilePath=local_file_path Key=' {} '.format (key) except CosServiceError as e: print (e.get_error_code ()) print (e.get_error_msg ()) return-1 logger.info ("Upload data map file [% s] Success"% key) return 0def do_mapping (cos_client, bucket, key, middle_stage_bucket Middle_file_key): src_file_path = download_file download_file'+ key.split ('/') [- 1] middle_file_path = download src file if download_ret = inputfile = open (src_file_path) 'r') # open local / tmp file mapfile = open (middle_file_path,' w') # open a new file write stream for line in inputfile: line = re.sub ('[^ a-zA-Z0-9]','' Line) # replace non-alphabetic/number characters words = line.split () for word in words: mapfile.write ('% st%s'% (word, 1)) # count for 1 mapfile.write ('n') inputfile.close () mapfile.close () upload_ret = upload_file (cos_client, middle_stage_bucket, middle_file_key Middle_file_path) # upload the file's each word delete_file_folder (src_file_path) delete_file_folder (middle_file_path) return upload_ret else: return-1def map_caller (event, context) Cos_client): appid = event ['Records'] [0] [' cos'] ['cosBucket'] [' appid'] bucket = event ['Records'] [0] [' cos'] ['cosBucket'] [' name'] +'-'+ appid key = event ['Records'] [0] [' cos'] ['cosObject'] [' key'] key = key.replace ('/'+ str (appid) +'/'+ event [Records'') ] [0] ['cos'] [' cosBucket'] ['name'] +' /' 1) logger.info ("Key is" + key) middle_bucket = middle_stage_bucket +'-'+ appid middle_file_key ='/'+ 'middle_' + key.split (' /') [- 1] return do_mapping (cos_client, bucket, key, middle_bucket, middle_file_key) def main_handler (event Context): logger.info ("start main handler") if "Records" not in event.keys (): return {"errorMsg": "event is not come from cos"} secret_id = "secret_key =" config = CosConfig (Region=region, SecretId=secret_id, SecretKey=secret_key,) cos_client = CosS3Client (config) start_time = datetime.datetime.now () res = map_caller (event, context) Cos_client) end_time = datetime.datetime.now () print ("data mapping duration:" + str ((end_time-start_time). Microseconds / 1000) + "ms") if res = = 0: return "Data mapping SUCCESS" else: return "Data mapping FAILED"

In the same way, create the reducer.py file and write the Reducer logic as follows:

#-*-coding: utf8-*-from qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom qcloud_cos_v5 import CosServiceErrorfrom operator import itemgetterimport osimport sysimport datetimeimport loggingregion = upriaplyguangzhou' # modify the region result_bucket = upright destmr'# according to the actual situation Modify the bucket name logging.basicConfig (level=logging.INFO, stream=sys.stdout) logger = logging.getLogger () logger.setLevel (level=logging.INFO) def delete_file_folder (src): if os.path.isfile (src): try: os.remove (src) except: pass elif os.path.isdir (src): for item in os.listdir (src): itemsrc = os.path.join (src) Item) delete_file_folder (itemsrc) try: os.rmdir (src) except: passdef download_file (cos_client, bucket, key, download_path): logger.info ("Get from [% s] to download file [% s]"% (bucket, key)) try: response = cos_client.get_object (Bucket=bucket, Key=key ) response ['Body'] .get_stream_to_file (download_path) except CosServiceError as e: print (e.get_error_code ()) print (e.get_error_msg ()) return-1 return 0def upload_file (cos_client, bucket, key Local_file_path): logger.info ("Start to upload file to cos") try: response = cos_client.put_object_from_local_file (Bucket=bucket, LocalFilePath=local_file_path Key=' {} '.format (key) except CosServiceError as e: print (e.get_error_code ()) print (e.get_error_msg ()) return-1 logger.info ("Upload data map file [% s] Success"% key) return 0def qcloud_reducer (cos_client, bucket, key, result_bucket Result_key): word2count = {} src_file_path = upright key TMPAGN'+ key.split ('/') [- 1] result_file_path = Upright Universe TMPAGUP'+ Upright resultDB'+ key.split ('/') [- 1] download_ret = download_file (cos_client, bucket, key, src_file_path) if download_ret = 0: map_file = open (src_file_path 'r') result_file = open (result_file_path,'w') for line in map_file: line = line.strip () word, count = line.split ('tweak, 1) try: count = int (count) word2count [word] = word2count.get (word) 0) + count except ValueError: logger.error ("error value:% s, current line:% s"% (ValueError, line)) continue map_file.close () delete_file_folder (src_file_path) sorted_word2count = sorted (word2count.items () Key=itemgetter (1)) [:-1] for wordcount in sorted_word2count: res ='% st%s'% (wordcount [0], wordcount [1]) result_file.write (res) result_file.write ('n') result_file.close () upload_ret = upload_file (cos_client, result_bucket, result_key) Result_file_path) delete_file_folder (result_file_path) return upload_retdef reduce_caller (event, context) Cos_client): appid = event ['Records'] [0] [' cos'] ['cosBucket'] [' appid'] bucket = event ['Records'] [0] [' cos'] ['cosBucket'] [' name'] +'-'+ appid key = event ['Records'] [0] [' cos'] ['cosObject'] [' key'] key = key.replace ('/'+ str (appid) +'/'+ event [Records'') ] [0] ['cos'] [' cosBucket'] ['name'] +' /' 1) logger.info ("Key is" + key) res_bucket = result_bucket +'-'+ appid result_key ='/'+ 'result_' + key.split (' /') [- 1] return qcloud_reducer (cos_client, bucket, key, res_bucket, result_key) def main_handler (event Context): logger.info ("start main handler") if "Records" not in event.keys (): return {"errorMsg": "event is not come from cos"} secret_id = "SecretId" secret_key = "SecretKey" config = CosConfig (Region=region, SecretId=secret_id, SecretKey=secret_key,) cos_client = CosS3Client (config) start_time = datetime.datetime.now () res = reduce_caller (event, context Cos_client) end_time = datetime.datetime.now () print ("data reducing duration:" + str ((end_time-start_time). Microseconds / 1000) + "ms") if res = 0: return "Data reducing SUCCESS" else: return "Data reducing FAILED" deployment and testing

Follow the yaml specification of Serverless Framework and write serveerless.yaml:

WordCountMapper: component: "@ serverless/tencent-scf" inputs: name: mapper codeUri:. / code handler: index.main_handler runtime: Python3.6 region: ap-guangzhou description: website Monitoring memorySize: 64 timeout: 20 events:-cos: name: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com parameters: bucket: srcmr-1256773370.cos.ap-guangzhou.myqcloud .com filter: prefix:''suffix:' 'events: cos:ObjectCreated:* enable: trueWordCountReducer: component: "@ serverless/tencent-scf" inputs: name: reducer codeUri:. / code handler: index.main_handler runtime: Python3.6 region: ap-guangzhou description: website Monitoring memorySize: 64 timeout: 20 Events:-cos: name: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com parameters: bucket: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com filter: prefix:''suffix:' 'events: cos:ObjectCreated:* enable: true

When done, deploy through the sls-debug directive. After the deployment is successful, perform basic tests:

Prepare an English document:

Log in to Tencent Cloud backend, open the bucket we originally created: srcmr, and upload the file

After the upload is successful, wait a moment to see that the Reducer program has been executed in Mapper, and the log is generated:

At this point, we open the result bucket and view the results:

Now, we have completed the simple word frequency statistics function.

Summary

The Serverless architecture is suitable for big data processing. On Tencent Cloud's official website, we can also see its scenario description of data ETL processing:

In this example, there is the operation of deploying multiple functions with one click. In the actual production, each project will not be a single function alone, but a combination of multiple functions to form a Service system, so it is particularly important to deploy multiple functions with one button. Through this example, it is hoped that readers can know more about the application scenarios of Serverless architecture, and can be inspired to combine cloud functions and different triggers and apply them to their own business.

The above is the process of implementing WordCount algorithm with cloud function SCF and object storage. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report