In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces Python how to achieve real-time incremental data loading tool, the article is very detailed, has a certain reference value, interested friends must read it!
Combined with the practical application case of singleton model: the solution of real-time incremental data loading tool is realized. The most important thing is to implement an incremental ID record table that can be added, modified, deleted, and so on.
Singleton pattern: provides a global access point to ensure that the class has and has only one specific type of object. It is usually used in the following scenarios: logging or database operations, etc., to avoid conflicts with a resource request.
Create incremental ID record table import sqlite3import datetimeimport pymssqlimport pandas as pdimport timepd.set_option ('expand_frame_repr', False)
Import required modules
# create data table database_path = ritual.\ Database\ ID_Record.db'from sqlite3 import connectwith connect (database_path) as conn: conn.execute ('CREATE TABLE IF NOT EXISTS Incremental_data_max_id_record (id INTEGER PRIMARY KEY AUTOINCREMENT,F_SDaqID_MAX TEXT,record_date datetime)')
Incremental latest records ID-F_SDaqID_MAX database storage
# data is saved to local txtdef text_save (filename, record): # filename is the path to write txt file, and record is to write F_SDaqID_MAX and record_date data list. File = open (filename,'a') append method # file = open (filename,' w') # override method for i in range (len (record)): s = str (record[ I]). Replace ('[','). Replace ('],') s = s.replace (",','). Replace (',',') +'\ n'# remove single quotes, commas Append the newline character file.write (s) file.close () to the end of each line
Incremental latest records ID-F_SDaqID_MAX temporary file storage
Incremental ID records provide two implementation schemes, one is data persistence storage mode, and the other is temporary file storage mode. The data persistence pattern, as its name implies, means that when creating objects, key operational information such as incremental ID-F_SDaqID_MAX can be recorded. This flag record mapping is a common design pattern.
Database connection class
To achieve real-time incremental data acquisition, we need to implement two database connection classes: incremental data ID storage class and incremental target data source class. Here, the singleton pattern is used to implement the database operation class, and the incremental service records are sequentially stored in the database or specific log files to maintain the consistency of the data.
1. Incremental data ID stores sqlite connection class code
Class Database_sqlite (metaclass=MetaSingleton): database_path = ringing.\ Database\ energy_rc_configure.db' connection = None def connect (self): if self.connection is None: self.connection = sqlite3.connect (self.database_path,check_same_thread=False,isolation_level=None) self.cursorobj = self.connection.cursor () return self.cursorobj Self.connection # insert maximum record @ staticmethod def Insert_Max_ID_Record (F1, f2): cursor = Database_sqlite () .connect () print (cursor) sql = f "insert into Incremental_data_max_id_record (Flying SDaqID MAXprecinct recording date) values (" {F1} ") "{f2}") "" cursor [0] .execute (sql) # sql = "insert into Incremental_data_max_id_record (FairSDaqID MAX recording recording date) values (?)" # cursor [0] .execute (sql, (f "{f1}", f "{f2}")) cursor [1] .commit () print ("insert successful!") # cursor [0] .close () return # fetch the latest ID record in the incremental database @ staticmethod def View_Max_ID_Records (): cursor = Database_sqlite () .connect () sql = "select max (F_SDaqID_MAX) from Incremental_data_max_id_record" cursor [0] .execute (sql) results = cursor [0] .fetchone () [0 ] # # Singleton mode does not need to close database connection # cursor [0] .close () print ("latest record ID" Results) return results # Delete data record ID @ staticmethod def Del_Max_ID_Records (): cursor = Database_sqlite () .connect () sql = "delete from Incremental_data_max_id_record where record_date = (select MAX (record_date) from Incremental_data_max_id_record)" cursor [0] .execute (sql) # results = cursor [0] .fetchone () [0] # # cursor [0] .close () cursor [1] .commit () print ("deleted successfully") return
2. Incremental data source sqlserver connection class code
Class Database_sqlserver (metaclass=MetaSingleton): "" # Real-time database "" connection = None # def connect (self): def _ _ init__ (self): if self.connection is None: self.connection = pymssql.connect (host= "xxxxx", user= "xxxxx", password= "xxxxx", database= "xxxxx" Charset= "utf8") if self.connection: print ("connection succeeded!") # Open database connection self.cursorobj = self.connection.cursor () # return self.cursorobj Self.connection # gets the maximum ID @ staticmethod def get_F_SDaqID_MAX () in the data source: # cursor_insert = Database_sqlserver (). Connect () cursor_insert = Database_sqlserver (). Cursorobj sql_MAXID = "" select MAX (F_SDaqID) from T_DaqDataForEnergy "cursor_insert.execute (sql_MAXID) # execute query statement Select all data in the table F_SDaqID_MAX = cursor_insert.fetchone () [0] # get record print ("maximum ID value: {0}" .format (F_SDaqID_MAX)) return F_SDaqID_MAX # extract incremental data @ staticmethod def get_incremental_data (incremental_Max_ID): # start getting incremental data sql_incremental_data = "" select F_ID FanciDatetime.Fidelity data from T_DaqDataForEnergy where F_ID > {0} "" .format (incremental_Max_ID) # cursor_find = Database_sqlserver () .connect () cursor_find = Database_sqlserver () .cursorobj cursor_find.execute (sql_incremental_data) # execute query statement Select all data in the table Target_data_source = cursor_find.fetchall () # get all data records # cursor_find.close () cursor_find.close () df = pd.DataFrame (Target_data_source, columns= ["F_ID", "F_Datetime" "F_Data"]) print ("extract data", df) return df
The design of data resource application service mainly considers the consistency of database operations and optimizes various database operations to improve the utilization of memory or CPU.
A variety of read and write operations are implemented, and the client operation calls API to perform the corresponding DB operation.
Note:
1. Use metaclass implementation to create classes with singleton characteristics.
Database_sqlserver (metaclass=MetaSingleton)
Database_sqlite (metaclass=MetaSingleton)
When you define a new class using class, the database class Database_sqlserver is decorated by MetaSingleton and metaclass is specified, then the special method _ _ call__ method of MetaSingleton is executed automatically.
Class MetaSingleton (type): _ instances= {} def _ _ call__ (cls, * args,**kwargs): if cls not in cls._instances: cls._ instances [CLS] = super (MetaSingleton,cls). _ _ call__ (* args,**kwargs) return cls._ instances [CLS]
The above code is based on the singleton implementation of the metaclass. When the client performs some operations on the database, the database class is instantiated multiple times, but only one object is created, so the call to the database is synchronous.
2. It is necessary to adopt a certain synchronization mechanism for multithreads to use the same database connection resources.
If the synchronization mechanism is not used, some unexpected situations may occur.
1) with cls.lock lock
Class MetaSingleton (type): _ instances= {} lock = threading.Lock () def _ _ call__ (cls, * args, * * kwargs): with cls.lock: if cls not in cls._instances: time.sleep # Simulation time-consuming cls._ instances [CLS] = super (MetaSingleton,cls). _ _ call__ (* args) * * kwargs) return cls._ instances [CLS]
The creation and release of locks consumes resources, and the above code must acquire the lock each time it is created.
3. If the program we develop is not a single application, but clustered, that is, multiple clients share a single database, resulting in database operations can not be synchronized, and database connection pool is a better choice. It greatly saves memory, improves the service efficiency of the server, and can support more customer services.
The solution of database connection pool is to establish enough database connections when the application starts, and these connections form a connection pool, and the application dynamically applies, uses, and releases the connections in the pool. For concurrent requests that exceed the number of connections in the connection pool, they should be queued in the request queue.
Incremental data service client
Incremental processing strategy: when loading for the first time, determine whether the latest record exists in the incremental data table, and if so, directly load it; otherwise, record the largest / latest data record ID or point in time, and save it to an incremental database or record file.
Only the largest / latest ID or data after the point in time is loaded from the second load. When the loading process is completed successfully, update the incremental database or record file synchronously, and update the last record ID or time point of this data record.
In general, this kind of data record table has self-growing columns, so you can also use self-growing columns to achieve this identification feature. For example, this time I used the data table growth column F_ID.
Class IncrementalRecordServer: _ servers = [] _ instance = None def _ _ new__ (cls, * args, * * kwargs): if not IncrementalRecordServer._instance: # IncrementalRecordServer._instance = super (). _ _ new__ (cls) IncrementalRecordServer._instance = super (IncrementalRecordServer,cls). _ _ new__ (cls) return IncrementalRecordServer._instance def _ init__ (self) ChangeServersID=None): "" variable initialization procedure "" self.F_SDaqID_MAX = Database_sqlserver (). Get_F_SDaqID_MAX () self.record_date = datetime.datetime.now () .strftime ('% Y-%m-%d% H15% MRV% S') self.changeServersID = changeServersID # callback to update the local record Empty record replacement Temporary record def record (func): def Server_record (self): v = func (self) text_save (filename=r "F:\ AutoOps_platform\ Database\ Server_record.txt" Record=IncrementalRecordServer._servers) print ("saved successfully") return v return Server_record # add service record @ record def addServer (self): self._servers.append ([int (self.F_SDaqID_MAX), self.record_date]) print ("add record") Database_sqlite.Insert_Max_ID_Record (f1=self.F_SDaqID_MAX F2=self.record_date) # modify service record @ record def changeServers (self): # self._servers.pop () # here pass manually modified record ID self._servers.append ([self.changeServersID,self.record_date]) # delete and then insert to modify Database_sqlite.Del_Max_ID_Records () Database_sqlite.Insert_Max_ID_Record (f1=self.changeServersID) F2=self.record_date) print ("update record") # Delete service record @ record def popServers (self): # self._servers.pop () print ("delete record") Database_sqlite.Del_Max_ID_Records () # latest service record def getServers (self): # print (self._servers [- 1]) Max_ID_Records = Database_sqlite.View_Max_ID_Records () print ("View record" Max_ID_Records) return Max_ID_Records # extract data def Incremental_data_client (self): "# extract data (incremental data MAXID acquisition And extract incremental data) "" # Real-time database # determine whether the latest record exists when loading for the first time if self.getServers () = = None: # insert incremental database ID self.addServer () # extract incremental data data = Database_sqlserver.get_incremental_data (self F _ SDaqID_MAX) return data # get the latest maximum ID record in the incremental database incremental_Max_ID = self.getServers () # add record self.addServer () # extract incremental data Target_data_source = Database_sqlserver.get_incremental_data (incremental_Max_ID) return Target_data_source
Optimization strategy:
1. Delayed loading mode
The above incremental record service class IncrementalRecordServer controls the creation of an object by overriding the _ _ new__ method. When creating an object, we will first check whether the object exists. It can also be achieved by lazy loading, and the resource saving optimization is as follows.
Class IncrementalRecordServer: _ servers = [] _ instance = None def _ _ init__ (self ChangeServersID=None): "" variable initialization procedure "" self.F_SDaqID_MAX = Database_sqlserver (). Get_F_SDaqID_MAX () self.record_date = datetime.datetime.now (). Strftime ('% Y-%m-%d% H15% MRV% S') self.changeServersID = changeServersID if not IncrementalRecordServer._instance: print ("_ _ init__ object creation") else: print ("object already exists:" IncrementalRecordServer._instance) self.getInstance () @ classmethod def getInstance (cls): if not cls._instance: cls._instance = IncrementalRecordServer () return cls._instance
Lazy instantiation ensures that objects are not created until they are actually needed. When a = IncrementalRecordServer () is instantiated, the initialization _ _ init__ method is called, but no new objects are created. Lazy loading of class objects in this way, also known as deferred loading.
2. The singleton model can make effective use of spatial resources, using the same spatial resources each time.
Different operands have the same memory address, and different object initialization overrides the previous object initialization variable to ensure that the latest records are updated in real time. On the surface, the above code implements the singleton pattern, but in the case of multi-thread concurrency, there is a thread safety problem, and different object spaces may be created at the same time. For thread safety, locking processing can also be further added.
3. Scope of application and matters needing attention
This code is suitable for deploying the incremental data produced after the production is run at a specified point in time. If it is not enabled for a long time, reboot requires emptying the history record, that is, the incremental database or file ID needs to be emptied. Generally, there is no problem for real-time data increment to be loaded at once, so there is no need to pay much attention to this (the file code can be improved by itself) When loading a historical database or a fixed time interval produces too much data, the code needs to be further modified, the data scale needs to be judged, the starting node and the amount of data loaded need to be specified, and the scheme of 100 million-level data extraction will be shared next time, taking into account the comprehensive factors.
4. Learn more about Python garbage collection mechanism; in concurrent cases, optimize the thread pool to manage resources.
Finally, you can add a function to release resources
Def _ _ del__ (self): class_name = self.__class__.__name__ print (class_name, "destroy")
Del obj calls _ _ del__ () to destroy the object, freeing up its space; only the Python object is released when the object is no longer referenced. When other variables in the program refer to the instance object, even if the _ _ del__ () method is called manually, the method will not be executed immediately. This is related to the implementation of Python's garbage collection mechanism.
Results Test if _ _ name__ ='_ _ main__': for i in range (6): hc1 = IncrementalRecordServer () hc1.addServer () print ("Record_ID") Hc1._ del hc1 time.sleep [I] # del hc1 time.sleep (60) # Server2- client client # latest service record hc2 = IncrementalRecordServer () hc2.getServers () # View incremental data hc2.Incremental_data_client ()
Insert record
The simulation inserts a record every 1 minute and inserts 7 records into the incremental database
If _ _ name__ = ='_ main__': # Server3- client client # manually add incremental starting ID record hc3 = IncrementalRecordServer (changeServersID='346449980') hc3.changeServers ()
If _ _ name__ = ='_ main__': # Delete ID hc3 = IncrementalRecordServer (changeServersID='346449980') # hc3.changeServers () hc3.popServers ()
These are all the contents of the article "how to implement the Real-time incremental data loading tool for Python". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.