In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
This article is about how python3 implements the shredding table of concurrent access levels. The editor thought it was very practical, so I shared it with you as a reference. Let's follow the editor and have a look.
Scene description
Suppose you have a mysql table that is split horizontally into multiple host, each host having n sharding tables.
What should I do if I need to access these tables concurrently and get the query results quickly?
Here is a solution that uses python3's asyncio asynchronous io library and aiomysql asynchronous library to implement this requirement.
The code demonstrates that import loggingimport randomimport asynciofrom aiomysql import create_pool# assumes that the mysql table is scattered among 8 host, each host has 16 child tables TBLES = {"192.168.1.01": "table_000-015,000,000,015" means that the ip is shown continuously from table_000 to table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047" "192.168.1.04": "table_048-063,192.168.1.05": "table_064-079,192.168.1.06": "table_080-095,192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127" } USER = "xxx" PASSWD = "xxxx" # wrapper function Used to catch exception def query_wrapper (func): async def wrapper (* args, * * kwargs): try: await func (* args, * * kwargs) except Exception as e: print (e) return wrapper # actual sql access handler function Implement asynchronous non-blocking request @ query_wrapperasync def query_do_something (ip, db, table) through aiomysql: async with create_pool (host=ip, db=db, user=USER) Password=PASSWD) as pool: async with pool.get () as conn: async with conn.cursor () as cur: sql = ("select xxx from {} where xxxx") await cur.execute (sql.format (table)) res = await cur.fetchall () # then do something...# generate sql access queue Each element of the queue contains the function to access a table and the parameters def gen_tasks (): tasks = [] for ip, tbls in TBLES.items (): cols = re.split ('_ | -' Tbls) tblpre = "_" .join (cols [:-2]) min_num = int (cols [- 2]) max_num = int (cols [- 1]) for num in range (min_num, max_num+1): tasks.append ((query_do_something, ip, 'your_dbname',' {} _ {} '.format (tblpre) Num)) random.shuffle (tasks) return tasks# runs sql access request queue def run_tasks (tasks, batch_len): try: for idx in range (0, len (tasks), batch_len): batch_tasks = tasks [IDX: idx+batch_len] logging.info ("current batch, start_idx:%s len:%s"% (idx) Len (batch_tasks)) for i in range (0 Len (batch_tasks): l = batch_ tasks [I] batch_ tasks [I] = asyncio.ensure_future (l [0] (* 1:]) loop.run_until_complete (asyncio.gather (* batch_tasks)) except Exception as e: logging.warn (e) # main method Call def main () asynchronously through asyncio: loop = asyncio.get_event_loop () tasks = gen_tasks () batch_len = len (TBLES.keys ()) * 5 # all up to you run_tasks (tasks, batch_len) loop.close () Thank you for reading! This is the end of the method of realizing concurrent access level split table in python3. I hope the above content can be helpful to you, so that you can learn more knowledge. If you think the article is good, you can share it and let more people see it.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.