In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The following mainly brings you the practical analysis of the design and implementation of the distributed crawler system. I hope these words can bring you practical use, which is also the main purpose of my editing this article. All right, don't talk too much nonsense, let's just read the following.
1 Overview
In the case of not using the crawler framework, after multi-party learning, we try to implement a distributed crawler system, and can save the data to different places, such as MySQL, HBase and so on.
Based on the idea of interface-oriented coding to develop, so the system has a certain expansibility, interested friends directly look at the code, you can understand its design ideas, although the code is still tightly coupled in many places at present, but as long as it takes some time and effort, many of them can be extracted and configured.
Because of the lack of time, I only wrote about the crawlers of JD.com and SUNING, but I can completely realize the random scheduling of crawlers on different websites. based on its code structure, it is not difficult to crawl goods such as Gome and Tmall, but it is estimated that it will take a lot of time and effort. Because it actually takes a lot of time to parse the data on the web page, for example, when I crawl the price of SUNING's goods, the price is obtained asynchronously, and its api is a long combination of numbers, it took me several hours to find its pattern, and of course I admit that I am inexperienced.
In addition to the basic data crawling, the design of this system pays more attention to the following aspects:
1. How to achieve distribution, when the same program is packaged and distributed to different nodes, it does not affect the overall data crawling 2. How to realize url random round robin scheduling, the core is to do random 3. 5 for different top-level domain names. How to add seed url to url repository regularly to keep the crawler system from stopping? 4. How to achieve the monitoring of the crawler node program, and can send an email alarm 5. How to implement a random IP proxy library, which is similar to the second point, is for the purpose of anti-crawler.
The following will make an overall basic introduction to this system, in fact, I have very detailed comments in the code, interested friends can refer to the code, and finally I will give some data analysis of my crawler.
It is also important to note that the crawler system is implemented based on Java, but the language itself is still not the most important. Interested friends can try to implement it in Python.
2 distributed crawler system architecture
The overall system architecture is as follows:
So from the above architecture, we can see that the whole system is mainly divided into three parts:
Crawler system URL dispatching system Monitoring and alarm system
The crawler system is used to crawl data, because the system is designed to be distributed, so the crawler itself can run on different cloud server nodes.
The core of url scheduling system is url warehouse. The so-called url warehouse actually uses Redis to store the url list that needs to be crawled, and consumes the url in our url scheduler according to certain policies. From this point of view, url warehouse is also a url queue.
The monitoring and alarm system mainly monitors the crawler node, although the death of one of the crawler nodes executed in parallel has no effect on the overall data crawling itself (only reduces the speed of the crawler). But we still want to know that we can actively receive the notification of the node hanging, rather than passively find it.
The following will be aimed at the above three aspects and combined with some code snippets to do some basic introduction to the design ideas of the whole system, friends who are interested in the complete implementation of the system can directly refer to the source code.
3 crawler system
(note: zookeeper monitoring belongs to monitoring and alarm system, url scheduler belongs to URL dispatching system)
The crawler system is an independent process, we package our crawler system into jar packages, and then distribute it to different nodes for execution, so that parallel crawling data can improve the efficiency of the crawler.
3.1Random IP Agent
Random IP proxies are added mainly for anti-crawler purposes, so if there is an IP proxy library and different proxies can be used randomly when building http clients, it will be very helpful for us to do anti-crawler.
To use the IP proxy library in the system, you need to first add the available proxy address information to the text file:
# IPProxyRepository.txt58.60.255.104:8118219.135.164.245:312827.44.171.27:9999219.135.164.245:312858.60.255.104:811858.252.6.165:9000.
It should be noted that the above agent IP is some of the agent IP I got on the West thorn agent, which may not be available. It is recommended to buy a batch of agent IP by yourself, which can save a lot of time and energy to find the agent IP.
Then, in the utility class that builds the http client, when the utility class is used for the first time, the proxy IP is loaded into memory and into a HashMap of Java:
/ / IP address proxy library Mapprivate static Map IPProxyRepository = new HashMap (); private static String [] keysArray = null; / / keysArray is to facilitate the generation of random proxy objects / * use static code blocks to load the IP proxy library into set * / static {InputStream in = HttpUtil.class.getClassLoader (). GetResourceAsStream ("IPProxyRepository.txt") / / load text containing proxy IP / / build buffered stream object InputStreamReader isr = new InputStreamReader (in); BufferedReader bfr = new BufferedReader (isr); String line = null; try {/ / cycle through each line and add while ((line = bfr.readLine ())! = null) {String [] split = line.split (":") to map / / with: as a delimiter, that is, the data format in the text should be 192.168.1.1 String host = split [0]; int port = Integer.valueOf (split [1]); IPProxyRepository.put (host, port);} Set keys = IPProxyRepository.keySet (); keysArray = keys.toArray (new String [keys.size ()]) / / keysArray is to facilitate the generation of random proxy objects} catch (IOException e) {e.printStackTrace ();}}
After that, every time you build a http client, you will first go to map to see if there is a proxy IP. If you do, you will use it. If not, you will not use a proxy:
CloseableHttpClient httpClient = null;HttpHost proxy = null;if (IPProxyRepository.size () > 0) {/ / if the ip proxy address library is not empty, set proxy proxy = getRandomProxy (); httpClient = HttpClients.custom (). SetProxy (proxy). Build (); / / create httpclient object} else {httpClient = HttpClients.custom (). Build (); / create httpclient object} HttpGet request = new HttpGet (url); / / build htttp get request.
Random proxy objects are generated by the following methods:
/ * randomly return a proxy object * * @ return * / public static HttpHost getRandomProxy () {/ / randomly get host:port and build a proxy object Random random = new Random (); String host = keysArray [room.nextInt (keysArray.length)]; int port = IPProxyRepository.get (host); HttpHost proxy = new HttpHost (host, port); / / set http proxy return proxy;}
In this way, through the above design, the function of random IP agent is basically realized, of course, there are many things that can be improved, for example, when using this IP agent and the request fails, can this situation be recorded? when it exceeds a certain number of times, it can be deleted from the agent library, and a log can be generated for developers or operators to refer to. But I won't do this function.
3.2 Web page downloader
The web downloader is used to download the data in the web page, which is mainly based on the following interfaces:
/ * Web data download * / public interface IDownload {/ * download the web data of a given url * @ param url * @ return * / public Page download (String url);}
Based on this, only one http get downloader has been implemented in the system, but it can also accomplish the functions we need:
/ * * data download implementation class * / public class HttpGetDownloadImpl implements IDownload {@ Override public Page download (String url) {Page page = new Page (); String content = HttpUtil.getHttpContent (url); / / get web data page.setUrl (url); page.setContent (content); return page;} 3.3 Web parser
The web page parser parses the data we are interested in in the downloaded web page and saves it to an object for further processing in the data memory to save to different persistence repositories. It is developed based on the following interfaces:
/ * Web data parsing * / public interface IParser {public void parser (Page page);}
The web parser is also a relatively important component in the development of the whole system. The function is not complex, mainly because there is a lot of code. The corresponding parser may be different for different merchandise in different shopping malls. Therefore, it is necessary to develop products for special shopping malls, because obviously, the web page template used by JD.com is definitely different from that used by SUNING, and the one used by Tmall is definitely different from that used by JD.com. So it all depends on your own needs for development, only that you will find some duplicate code in the parser development process, and then you can abstract the code to develop a tool class.
At present, the mobile product data of JD.com and SUNING are crawled in the system, so these two implementation classes are written:
/ * * parsing JD.com commodity implementation class * / public class JDHtmlParserImpl implements IParser {.} / * * SUNING web page parsing * / public class SNHtmlParserImpl implements IParser {.} 3.4 data memory
The data storage mainly stores the data objects parsed by the web page parser to different ones. For the mobile products crawled this time, the data object is the following Page object:
/ * Web objects, mainly including web content and product data * / public class Page {private String content; / / web content private String id; / / commodity Id private String source; / / commodity source private String brand; / / commodity brand private String title / / Commodity title private float price; / / Commodity Price private int commentCount; / / Commodity comments private String url; / / Commodity address private String imgUrl; / / Commodity picture address private String params; / / Commodity specification parameter private List urls = new ArrayList () / / the container used to save the parsed product url when parsing the list page}
Accordingly, in MySQL, the table data structure is as follows:
-Table structure for phone---DROP TABLE IF EXISTS `phone` CREATE TABLE `phone` (`id` varchar (30) CHARACTER SET armscii8 NOT NULL COMMENT 'Commodity id', `source` varchar (30) NOT NULL COMMENT' Commodity Source Such as jd suning gome, `brand` varchar (30) DEFAULT NULL COMMENT', `title` varchar (255) DEFAULT NULL COMMENT 'handset title', `price`float (10pime2) DEFAULT NULL COMMENT 'cellphone price', `comment_ count` varchar (30) DEFAULT NULL COMMENT 'handset comment', `url`varchar 'DEFAULT NULL COMMENT' handset detail address', `img_ url`DEFAULT NULL COMMENT 'picture address', `params` text COMMENT 'handset parameters Json format storage', PRIMARY KEY (`id`, `source`) ENGINE=InnoDB DEFAULT CHARSET=utf8
The table structure in HBase is as follows:
# # cf1 Storage id source price comment brand url## cf2 Storage title params imgUrlcreate 'phone',' cf1' 'cf2'## looks at the created table HBASE (main): 135main 0 > desc 'phone'Table phone is ENABLED phone in HBase shell COLUMN FAMILIES DESCRIPTION {NAME = > 'cf1' BLOOMFILTER = > 'ROW', VERSIONS = >' 1century, IN_MEMORY = > 'false', KEEP_DELETED_CELLS = >' FALSE', DATA_BLOCK_ENCODING = > 'NONE', TTL = >' FOREVER', COMPRESSION = > 'NONE', MIN_VERSIONS = >' 0mm, BLOCKCACHE = > 'true', BLOCKSIZE = >' 65536', REPLICATION_SCOPE = >'0'} {NAME = > 'cf2' BLOOMFILTER = > 'ROW', VERSIONS = >' 1century, IN_MEMORY = > 'false', KEEP_DELETED_CELLS = >' FALSE', DATA_BLOCK_ENCODING = > 'NONE', TTL = >' FOREVER', COMPRESSION = > 'NONE', MIN_VERSIONS = >' 013', BLOCKCACHE = > 'true', BLOCKSIZE = >' 65536', REPLICATION_SCOPE = >'0'} 2 row (s) in 0.0350 seconds
That is, two column families are established in HBase, namely cf1 and cf2, in which cf1 is used to save id source price comment brand url field information and cf2 is used to save title params imgUrl field information.
Different data stores use different implementation classes, but they are all developed based on the following same interface:
/ * * Storage of commodity data * / public interface IStore {public void store (Page page);}
Then based on this, we develop the storage implementation class of MySQL, the storage implementation class of HBase and the output implementation class of the console, such as the storage implementation class of MySQL, which is actually a simple data insertion statement:
/ * write data to the mysql table using dbc database connection pool * / public class MySQLStoreImpl implements IStore {private QueryRunner queryRunner = new QueryRunner (DBCPUtil.getDataSource ()); @ Override public void store (Page page) {String sql = "insert into phone (id, source, brand, title, price, comment_count, url, img_url, params) values (?,?)" Try {queryRunner.update (sql, page.getId (), page.getSource (), page.getBrand (), page.getTitle (), page.getPrice (), page.getCommentCount (), page.getUrl ()) Page.getImgUrl (), page.getParams () } catch (SQLException e) {e.printStackTrace ();}
The storage implementation class of HBase is the common insert statement code of HBase Java API:
. / / cf1:pricePut pricePut = new Put (rowKey); / / it must be determined whether it is null, otherwise there will be null pointer exception pricePut.addColumn (cf1, "price" .getBytes (), page.getPrice ()! = null? String.valueOf (page.getPrice ()). GetBytes (): "" .getBytes (); puts.add (pricePut); / / cf1:commentPut commentPut = new Put (rowKey); commentPut.addColumn (cf1, "comment" .getBytes (), page.getCommentCount ()! = null? String.valueOf (page.getCommentCount ()). GetBytes (): "" .getBytes (); puts.add (commentPut); / / cf1:brandPut brandPut = new Put (rowKey); brandPut.addColumn (cf1, "brand" .getBytes (), page.getBrand ()! = null? Page.getBrand (). GetBytes (): "" .getBytes (); puts.add (brandPut);.
Of course, when initializing the crawler, you can manually choose where to store the data:
/ / 3. Injection memory iSpider.setStore (new HBaseStoreImpl ())
At present, the code has not been written so that it can be stored in multiple places at the same time, according to the current architecture of the code, it is relatively easy to achieve this, just modify the corresponding code. In fact, you can first save the data to MySQL, and then import it into HBase through Sqoop. For details, you can refer to the Sqoop article I wrote.
It is still important to note that if you determine that you need to save the data to HBase, make sure that you have an available cluster environment, and you need to add the following configuration document to classpath:
Core-site.xmlhbase-site.xmlhdfs-site.xml
Students who are interested in big data can toss this point. If you haven't touched it before, just use MySQL storage directly. You only need to inject MySQL memory when initializing the crawler program:
/ / 3. Injected memory iSpider.setStore (new MySQLStoreImpl ()); 4 URL scheduling system
URL scheduling system is the bridge and key to realize the distribution of the whole crawler system. It is through the use of URL scheduling system that the whole crawler system can efficiently obtain url randomly (Redis as storage) and realize the distribution of the whole system.
4.1 URL Warehouse
From the architecture diagram, we can see that the so-called URL warehouse is just a Redis warehouse, that is, we use Redis to store the url address list in our system. Only in this way can we ensure that our program is distributed, as long as the url is unique, so that no matter how many crawlers we have, there is only one copy of the data saved in the end, and it will not be repeated.
At the same time, the strategy of obtaining the url address in the url warehouse is realized through the queue, which will be known through the implementation of the URL scheduler.
In addition, the following data is mainly saved in our url repository:
Seed URL list
The data type of Redis is list.
The seed URL is persisted. After a certain period of time, the URL timer gets the URL through the seed URL and injects it into the high-priority URL queue that our crawler needs to use, so that our crawler can keep crawling data without suspending the execution of the program.
High priority URL queues
The data type of Redis is set.
What is a high priority URL queue? In fact, it is used to save the list url.
So what is a list url?
To put it bluntly, there are multiple items in a list. Taking JD.com as the column, let's open a mobile phone list as an example: this address does not contain the url of a specific product, but a list of data (mobile products) that we need to crawl. Through the analysis of each high-level url, we can get a lot of specific product url, and the specific product url, that is, low priority url. It is saved to a low-priority URL queue.
So take this system as an example, the saved data is similar to the following:
Jd.com.higher-- https://list.jd.com/list.html?cat=9987,653,655&page=1... Suning.com.higher-- https://list.suning.com/0-20006-0.html... Low priority URL queues
The data type of Redis is set.
Low priority URL is actually the URL of a specific product, such as the following mobile product:
By downloading the data of the url and parsing it, we can get the data we want.
So take this system as an example, the saved data is similar to the following:
Jd.com.lower-- https://item.jd.com/23545806622.html... suning.com.lower-- https://product.suning.com/0000000000/690128156.html... 4.2 URL scheduler
To put it bluntly, the so-called url scheduler is the scheduling strategy of the java code in url warehouse, but because its core is scheduling, it is explained in the URL scheduler. Currently, its scheduling is based on the following APIs:
/ * url warehouse * main functions: * add url (high priority list, low priority goods url) to the warehouse * get url from the warehouse (get high priority url first, if not Then get the low priority url) * * / public interface IRepository {/ * the method to get the url * get the url from the warehouse (give priority to the high priority url, if not, get the low priority url) * @ return * / public String poll () / * add goods list url * @ param highUrl * / public void offerHigher (String highUrl) to the high priority list; / * add goods url * @ param lowUrl * / public void offerLower (String lowUrl) to the low priority list;}
The implementation of the URL repository based on Redis is as follows:
/ * the whole web crawler based on Redis randomly obtains the crawler url: * * the data structure used to store url in Redis is as follows: * 1. The collection of domain names to be crawled (the storage data type is set, which needs to be added in Redis first) * key * spider.website.domains * value (set) * jd.com suning.com gome.com * key is obtained by constant object SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY * 2. The high and low priority url queues corresponding to each domain name (storage data type is list This is dynamically added by the crawler after parsing the seed url) * key * jd.com.higher * jd.com.lower * suning.com.higher * suning.com.lower * gome.com.higher * gome.come.lower * value (list) * corresponding to the list of url to be parsed * Key is obtained by random domain name + constant SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX or SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX. Seed url list * key * spider.seed.urls * value (list) * seed url * key of the data to be crawled is obtained by constant SpiderConstants.SPIDER_SEED_URLS_KEY * * the url in the seed url list is timed by the url scheduler to * / public class RandomRedisRepositoryImpl implements IRepository {/ * in the high and low priority url queue. Manufacturing method * / public RandomRedisRepositoryImpl () {init () } / * initialization method: when initializing, delete all the high and low priority url queues in redis * otherwise, when the url in the last url queue is not consumed, stop starting and running for the next time, it will result in duplicate url * / public void init () {Jedis jedis = JedisUtil.getJedis () in the url warehouse. Set domains = jedis.smembers (SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); String higherUrlKey; String lowerUrlKey; for (String domain: domains) {higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; jedis.del (higherUrlKey, lowerUrlKey);} JedisUtil.returnJedis (jedis) } / * get url from the queue. The current policy is: * 1. First get * 2 from the high priority url queue. Then get from the low-priority url queue * corresponding to our actual scenario, we should first parse the list url and then parse the commodity url * but it should be noted that in a distributed multithreaded environment, it must not be completely guaranteed, because at some point the url of * in the high-priority url queue is exhausted, but in fact the program is still parsing the next high-priority url. Other threads will definitely not get the high-priority queue url * then they will get the url in the low-priority queue. When actually considering the analysis, we should pay special attention to * @ return * / @ Override public String poll () {/ / randomly get a top-level domain Jedis jedis = JedisUtil.getJedis () from set. String randomDomain = jedis.srandmember (SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); / / jd.com String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; / / jd.com.higher String url = jedis.lpop (key); if (url = = null) {/ / if it is null, get key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX from the low priority / / jd.com.lower url = jedis.lpop (key);} JedisUtil.returnJedis (jedis); return url;} / * * add url * @ param highUrl * / @ Override public void offerHigher (String highUrl) {offerUrl (highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX) to the high priority url queue } / * add url * @ param lowUrl * / @ Override public void offerLower (String lowUrl) {offerUrl (lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX) to the low priority url queue } / * A common way to add url, which is abstracted by offerHigher and offerLower. The url * @ param urlTypeSuffix url type suffix that needs to be added by * @ param url or .lower * / public void offerUrl (String url, String urlTypeSuffix) {Jedis jedis = JedisUtil.getJedis (); String domain = SpiderUtil.getTopDomain (url) / / obtain the top-level domain corresponding to url, such as jd.com String key = domain + urlTypeSuffix; / / key of stitching url queues, such as jd.com.higher jedis.lpush (key, url); / / add url JedisUtil.returnJedis (jedis) to url queues;}}
Through code analysis, we can also know that the core is how to schedule url in the url repository (Redis).
4.3 URL timer
After a period of time, the url in both the high-priority URL queue and the low-priority URL queue will be consumed. In order to enable the program to continue crawling data and reduce human intervention, the seed url can be inserted into the Redis in advance, and then the URL timer will take out the url from the seed url and store it in the high-priority URL queue at a fixed time, so as to achieve the purpose of the program crawling data without interruption.
After url consumption, whether you need to crawl data continuously varies according to your business needs, so this step is not necessary, but it also provides such an operation. Because in fact, the data we need to crawl will be updated every once in a while, and if we want the crawled data to be updated regularly, then the timer will play a very important role. However, it should be noted that once you have decided that you need to crawl data repeatedly, you need to consider the problem of duplicate data when designing the memory implementation, that is, duplicate data should be an update operation. At present, this function is not included in the memory I designed. Interested friends can implement it on their own, just need to determine whether the data exists in the database before inserting the data.
It is also important to note that the URL timer is a separate process that needs to be started separately.
The timer is implemented based on Quartz, and the following is the code for its job:
/ * obtain seed url from url repository at a fixed time every day and add it to the high priority list * / public class UrlJob implements Job {/ / log4j logging private Logger logger = LoggerFactory.getLogger (UrlJob.class); @ Override public void execute (JobExecutionContext context) throws JobExecutionException {/ * 1. Get the seed url * 2 from the specified url seed warehouse. Add seed url to the high priority list * / Jedis jedis = JedisUtil.getJedis (); Set seedUrls = jedis.smembers (SpiderConstants.SPIDER_SEED_URLS_KEY); / / spider.seed.urls Redis data type is set to prevent repeated addition of seed url for (String seedUrl: seedUrls) {String domain = SpiderUtil.getTopDomain (seedUrl) / / seed url top-level domain jedis.sadd (domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl); logger.info ("get seed: {}", seedUrl);} JedisUtil.returnJedis (jedis); / / System.out.println ("Scheduler Job Test...");}}
The implementation of the scheduler is as follows:
/ * url timing scheduler to regularly store seeds in the corresponding warehouse of url * * Business regulations: every morning at 01:10 to store seeds url * / public class UrlJobScheduler {public UrlJobScheduler () {init ();} / * initialize the scheduler * / public void init () {try {Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler () / / if there is no execution of the following start method, the task scheduling scheduler.start () will not be enabled; String name = "URL_SCHEDULER_JOB"; String group = "URL_SCHEDULER_JOB_GROUP"; JobDetail jobDetail = new JobDetail (name, group, UrlJob.class); String cronExpression = "0 101 * *?" Trigger trigger = new CronTrigger (name, group, cronExpression); / / schedule task scheduler.scheduleJob (jobDetail, trigger);} catch (SchedulerException e) {e.printStackTrace ();} catch (ParseException e) {e.printStackTrace ();}} public static void main (String [] args) {UrlJobScheduler urlJobScheduler = new UrlJobScheduler () UrlJobScheduler.start ();} / * * schedule tasks regularly * because we need to regularly obtain seed url from the designated warehouse every day and store it in the high priority url list * so it is an uninterrupted program, so we cannot stop * / private void start () {while (true) {}} 5 to monitor the alarm system.
The main purpose of the monitoring and alarm system is to enable users to find the downtime of the node actively rather than passively, because in practice, the crawler may be running continuously, and we will deploy our crawler on multiple nodes, so it is necessary to monitor the node and find and correct it in time when there is something wrong with the node. The monitoring and alarm system is an independent process, which needs to be started separately.
5.1 basic principles
First, you need to create a / ispider node in zookeeper:
[zk: localhost:2181 (CONNECTED) 1] create / ispider ispiderCreated / ispider
The development of the monitoring and alarm system mainly depends on the implementation of zookeeper, and the monitoring program listens to the node directory under zookeeper:
[zk: localhost:2181 (CONNECTED) 0] ls / ispider []
When the crawler starts, it registers a temporary node directory under that node directory:
[zk: localhost:2181 (CONNECTED) 0] ls / ispider [192.168.43.166]
When a node goes down, the temporary node directory will be deleted by zookeeper
[zk: localhost:2181 (CONNECTED) 0] ls / ispider []
At the same time, because we listen to the node directory / ispider, when zookeeper deletes the node directory under it (or adds a node directory), zookeeper will send a notification to our monitoring program, that is, our monitoring program will be called back, so that we can perform the system action of alarm in the callback program, thus completing the function of monitoring alarm.
5.2 zookeeper Java API usage instructions
You can use zookeeper's native Java API, which I wrote in another RPC framework (the underlying Netty-based remote communication) using native API, but obviously the code is much more complicated and requires more learning and understanding of zookeeper, so it's easier to use.
So in order to reduce the difficulty of development, we use the third-party encapsulated API, that is, curator, to develop the zookeeper client program.
5.3 crawler system zookeeper registration
When starting the crawler system, our program starts a zookeeper client to register its own node information, mainly ip address, with zookeeper, and creates a node named after the node IP address of the crawler, such as / ispider/192.168.43.116, in the / ispider node directory. The code is as follows:
/ * Register zk * / private void registerZK () {String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; int baseSleepTimeMs = 1000; int maxRetries = 3; RetryPolicy retryPolicy = new ExponentialBackoffRetry (baseSleepTimeMs, maxRetries); CuratorFramework curator = CuratorFrameworkFactory.newClient (zkStr, retryPolicy); curator.start (); String ip = null Try {/ / registers the write node creation node ip = InetAddress.getLocalHost (). GetHostAddress (); curator.create (). WithMode (CreateMode.EPHEMERAL) .forPath ("/ ispider/" + ip, ip.getBytes ());} catch (UnknownHostException e) {e.printStackTrace ();} catch (Exception e) {e.printStackTrace ();}}
It should be noted that the node we created is a temporary node, and in order to achieve the monitoring and alarm function, it must be a temporary node.
5.4 Monitoring Program
First, you need to listen to a node directory in zookeeper. In our system, the design is to listen to the node directory / ispider:
Public SpiderMonitorTask () {String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; int baseSleepTimeMs = 1000; int maxRetries = 3; RetryPolicy retryPolicy = new ExponentialBackoffRetry (baseSleepTimeMs, maxRetries); curator = CuratorFrameworkFactory.newClient (zkStr, retryPolicy); curator.start (); try {previousNodes = curator.getChildren (). UsingWatcher (this) .forPath ("/ ispider");} catch (Exception e) {e.printStackTrace ();}}
Register the watcher in zookeeper above, that is, the callback program that receives the notification, in which we execute the logic of our alarm:
/ * this method, when the directory corresponding to the monitored zk is changed, it will be called * to get the latest node status and compare the latest node state with the initial or previous node state. Then we know who caused the node change * @ param event * / @ Overridepublic void process (WatchedEvent event) {try {List currentNodes = curator.getChildren (). UsingWatcher (this) .forPath ("/ ispider") / / HashSet previousNodesSet = new HashSet (previousNodes) If (currentNodes.size () > previousNodes.size ()) {/ / the latest node services, exceeding the number of previous node services For (String node: currentNodes) {if (! previousNodes.contains (node)) {/ / the current node is the new node logger.info ("- new crawler node {} added", node) } else if (currentNodes.size ()
< previousNodes.size()) { // 有节点挂了 发送告警邮件或者短信 for(String node : previousNodes) { if(!currentNodes.contains(node)) { // 当前节点挂掉了 得需要发邮件 logger.info("----有爬虫节点{}挂掉了", node); MailUtil.sendMail("有爬虫节点挂掉了,请人工查看爬虫节点的情况,节点信息为:", node); } } } // 挂掉和新增的数目一模一样,上面是不包括这种情况的,有兴趣的朋友可以直接实现包括这种特殊情况的监控 previousNodes = currentNodes; // 更新上一次的节点列表,成为最新的节点列表 } catch (Exception e) { e.printStackTrace(); } // 在原生的API需要再做一次监控,因为每一次监控只会生效一次,所以当上面发现变化后,需要再监听一次,这样下一次才能监听到 // 但是在使用curator的API时则不需要这样做} 当然,判断节点是否挂掉,上面的逻辑还是存在一定的问题的,按照上面的逻辑,假如某一时刻新增节点和删除节点事件同时发生,那么其就不能判断出来,所以如果需要更精准的话,可以将上面的程序代码修改一下。 5.5 邮件发送模块 使用模板代码就可以了,不过需要注意的是,在使用时,发件人的信息请使用自己的邮箱。 下面是爬虫节点挂掉时接收到的邮件: 实际上,如果购买了短信服务,那么通过短信API也可以向我们的手机发送短信。 6 实战:爬取京东、苏宁易购全网手机商品数据 因为前面在介绍这个系统的时候也提到了,我只写了京东和苏宁易购的网页解析器,所以接下来也就是爬取其全网的手机商品数据。 6.1 环境说明 需要确保Redis、Zookeeper服务可用,另外如果需要使用HBase来存储数据,需要确保Hadoop集群中的HBase可用,并且相关配置文件已经加入到爬虫程序的classpath中。 还有一点需要注意的是,URL定时器和监控报警系统是作为单独的进程来运行的,并且也是可选的。 6.2 爬虫结果 进行了两次爬取,分别尝试将数据保存到MySQL和HBase中,给出如下数据情况。 6.2.1 保存到MySQLmysql>Select count (*) from phone;+-+ | count (*) | +-+ | 12052 | +-1 row in setmysql > select count (*) from phone where source='jd.com';+-+ | count (*) | +-+ | 9578 | +-1 row in setmysql > select count (*) from phone where source='suning.com' +-+ | count (*) | +-+ | 2474 | +-+ 1 row in set
View the data in the visualization tool:
6.2.2 Save to HBasehbase (main): 225 count 'phone'Current count: 1000, row: 11155386088_jd.comCurrent count: 2000, row: 136191393_suning.comCurrent count: 3000, row: 16893837301_jd.comCurrent count: 4000, row: 19036619855_jd.comCurrent count: 5000, row: 1983786945_jd.comCurrent count: 6000, row: 1997392141_jd.comCurrent count: 7000, row: 21798495372_jd.comCurrent count: 8000, row: 24154264902_jd.comCurrent count: 9000, row: 25687565618_jd.comCurrent count: 10000 Row: 26458674797_jd.comCurrent count: 11000, row: 617169906_suning.comCurrent count: 12000, row: 769705049_suning.com 12348 row (s) in 1.5720 seconds= > 12348
View the data in HDFS:
6.2.3 Analysis of data volume and actual situation of JD.com
The list of JD.com 's mobile phone has more than 160 pages, and each list has 60 commodity data, so the total amount is about 9600, and our data is basically consistent. Later, through log analysis, we can know that the lost data is generally caused by connection timeout, so when selecting the environment of the crawler, it is more recommended to do it on the host with a good network environment. At the same time, it would be better if there is an IP proxy address library. In addition, for the connection timeout, in fact, we can further control in our program, once there is a data crawl failure of the url, you can add it to the retry url queue, at present, I do not do this function, interested students can try.
SUNING
Let's take a look at SUNING's. It has a list of about 100 pages of mobile phones, and each page also has 60 commodity data, so the total is about 6000. However, we can see that our data is of the order of magnitude of 3000 (what is still missing is the connection failure caused by frequent crawling). Why?
This is because, after opening a list page of SUNING, it first loads 30 items, and when the mouse slides down, it will load the other 30 goods data through another API, every list page is like this, so, in fact, we lack half of the commodity data without crawling. After knowing this reason, it is not difficult to achieve, but because of time constraints, I did not do it, interested friends toss about it.
6.3 analyze the performance of the crawler system through logs
In our crawler system, every key place, such as web page download, data analysis, etc., have hit logger, so through the log, we can roughly analyze the relevant time parameters.
2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.utils.HttpUtil] [INFO]-download page: https://list.jd.com/list.html?cat=9987,653,655&page=1, consumption time: 590 ms Agent information: null:null2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO]-parse list page: https://list.jd.com/list.html?cat=9987,653,655&page=1, Consumption duration: 46ms2018-04-01 21:26:03 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO]-parsing list page: https://list.suning.com/0-20006-0.html, Consumption time: 49ms2018-04-01 21:26:04 [pool-1-thread-5] [cn.xpleaf.spider.utils.HttpUtil] [INFO]-download page: https://item.jd.com/6737464.html Consumption time: 219 ms, agent information: 21:26:04 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO]-download page: https://list.jd.com/list.html?cat=9987,653,655&page=2&sort=sort_rank_asc&trans=1&JL=6_0_0, consumption time: 276 ms Agent information: null:null2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.utils.HttpUtil] [INFO]-download page: https://list.suning.com/0-20006-99.html, consumption time: 300 ms Agent information: null:null2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO]-parse list page: https://list.suning.com/0-20006-99.html, Consumption time: 4ms.2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.utils.HttpUtil] [INFO]-download page: https://club.jd.com/comment/productCommentSummaries.action?referenceIds=23934388891 Consumption time: 176 ms Agent information: null:null2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO]-parsing merchandise page: https://item.jd.com/23934388891.html, Consumption time: 413ms2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO]-download page: https://review.suning.com/ajax/review_satisfy/general-00000000010017793337-0070079092-----satisfy.htm Consumption time: 308 ms, agent information: null:null2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO]-parsing commodity page: https://product.suning.com/0070079092/10017793337.html, consumption time: 588ms.
On average, the time to download data from a product web page varies from 20000500 milliseconds, depending on the network at the time.
In addition, if you really want to calculate the data for crawling a product, you can calculate it through the data below the log:
Time to download data from a product page, time to get price data, time to get comment data
On my host (CPU:E5 10 core, memory: 32GB, 1 virtual machine and 3 virtual machines, respectively), the situation is as follows:
Number of nodes per node Thread number of goods time 15 JD.com + SUNING nearly 13000 commodity data 141 minutes 35 JD.com + SUNING nearly 13000 commodity data 65 minutes
As you can see, when using three nodes, the time will not be reduced to the original one, which is because the problems affecting the performance of the crawler are mainly network problems, such as a large number of nodes, a large number of threads, and a large number of network requests, but with a certain bandwidth, and in the case of no proxy, frequent requests and connection failures will increase, which will also have a certain impact on time, if random proxy libraries are used Things will be much better.
But what is certain is that after the scale-out increases the crawler nodes, it can greatly reduce our crawler time, which is also the benefit of the distributed crawler system.
7 Anti-crawler strategy used in crawler system
In the design of the whole crawler system, the following strategies are mainly used to achieve the purpose of anti-crawler:
Use proxy to access-- > IP proxy library, random IP proxy random top-level domain url access-- > url scheduling system each thread crawls a piece of commodity data sleep for a short period of time
For the above about the design and implementation of distributed crawler system practice analysis, we do not think it is very helpful. If you need to know more, please continue to follow our industry information. I'm sure you'll like it.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.