Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use kafka in nginx lua

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you how to use kafka in nginx lua. The content is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

Step 1: enter the opresty directory

[root@node03 openresty] # cd / export/servers/openresty/ [root@node03 openresty] # lltotal 356drwxr-xr-x 2 root root 4096 Jul 26 11:33 bindrwxrwxr-x 44 1000 1000 4096 Jul 26 11:31 builddrwxrwxr-x 43 1000 1000 4096 Nov 13 2017 bundle-rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure-rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHTdrwxr-xr-x 6 root root 4096 Jul 26 11:33 luajitdrwxr-xr-x 6 root root 4096 Aug 1 08:14 lualib-rw-r -- root root-1 root root 5413 Jul 26 11:32 Makefiledrwxr-xr-x 11 root root 4096 Jul 26 11:35 nginxdrwxrwxr-x 2 1000 1000 4096 Nov 13 2017 patchesdrwxr-xr-x 44 root root 4096 Jul 26 11:33 pod-rw-rw-r-- 1 1000 3689 Nov 13 2017 README.markdown-rw-rw-r-- 11000 8690 Nov 13 2017 README-win32.txt-rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.indexdrwxr-xr-x 5 root root 4096 Jul 26 11:33 sitedrwxr-xr-x 2 root root 4096 Aug 1 10:54 testluadrwxrwxr-x 2 1000 1000 4096 Nov 13 2017 util [root@node03 openresty] #

Description: let's take a look at two directories: lualib and nginx

1.lualib: it stores the integration software package needed by opresty

2.nginx: is the nginx service directory

Next, let's go to the lualib directory to see what's going on:

[root@node03 openresty] # cd lualib/ [root@node03 lualib] # lltotal 116-rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.sodrwxr-xr-x 3 root root 4096 Jul 26 11:33 ngxdrwxr-xr-x 2 root root 4096 Jul 26 11:33 rdsdrwxr-xr-x 2 root root 4096 Jul 26 11:33 redisdrwxr-xr-x 9 root root 4096 Aug 1 10:34 resty

Here we see the redis and ngx integration packages, which show that we can use nginx and redis between them without importing any dependent packages!

Let's take a look at some instructions in resty.

[root@node03 lualib] # cd resty/ [root@node03 resty] # lltotal 152 root root 6409 Jul 26 11:33 aes.luadrwxr-xr-x 2 root root 4096 Jul 26 11:33 core-rw-r--r-- 1 root root 596 Jul 26 11:33 core.luadrwxr-xr-x 2 root root 4096 Jul 26 11:33 dnsdrwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka # this is our own import drwxr-xr-x 2 Root root 4096 Jul 26 11:33 limit-rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.luadrwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache-rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua-rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua-rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua-rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua-rw-r--r-- 1 root root 616 Jul 26 11:33 random.lua-rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua-rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua-rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua-rw-r--r -- 1 root root 1045 Jul 26 11:33 sha384.lua-rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua-rw-r--r-- 1 root root 236 Jul 26 11:33 sha.lua-rw-r--r-- 1 root root 698 Jul 26 11:33 string.lua-rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.luadrwxr-xr-x 2 root root 4096 Jul 26 11:33 upstreamdrwxr-xr-x 2 root root 406 Jul 26 11:33 websocket

Here we see the familiar mysql.lua and redis.lua. All right, forget the rest.

Note: there is no kafka package here, which means that there is no integrated kafka in Opnrestyle. I have imported the kafka integration package here in advance.

Let's take a look at how many packages there are in kafka:

[root@node03 resty] # cd kafka [root@node03 kafka] # lltotal 48 root root 1369 Aug 1 10:42 broker.lua-rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua-rw-r--r-- 1 root root 710 Aug 1 10:42 errors.lua-rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua-rw-r--r-- 1 root root 4072 Aug 1 10:42 Request.lua-rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua-rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua-rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua

Attached is the kafka integration package: kafka_jb51.rar

Step 2: create a kafka test lua file

1. Go back to openresty

[root@node03 kafka] # cd / export/servers/openresty/

two。 Create a test file

[root@node03 openresty] # mkdir-r testlua# here you choose the file name and the file location, but you must find it.

Here the file name is taken by yourself, and the file location is determined by yourself, but you must find it! We'll need it next!

3. Go to the folder you just created and create the kafkalua.lua script file

Create a file: vim kafkalua.lua or touch kafkalua.lua

[root@node03 openresty] # cd testlua/ [root@node03 testlua] # lltotal 8 lltotal RW root@node03 openresty-1 root root 3288 Aug 1 10:54 kafkalua.lua

Kafkalua.lua:

-- Test statements can not be ngx.say ('hello kafka file configuration collection fulfillment')-- data collection threshold limit. If lua collection exceeds the threshold, do not collect local DEFAULT_THRESHOLD = 1000000-number of kafka partitions local PARTITION_NUM = 6-kafka topic name local TOPIC = 'B2CDATANGCOLLECTION1 polling-polling shared variable key value local POLLING_KEY = "POLLING_KEY"-kafka cluster (define kafka broker address Ip needs to be consistent with kafka's host.name configuration) local function partitioner (key, num, correlation_id) return tonumber (key) end--kafka broker list local BROKER_LIST = {host= "192.168.52.100", port=9092}, {host= "192.168.52.110", port=9092}, {host= "192.168.52.120", port=9092}}-- kafka parameter Local CONNECT_PARAMS = {producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner}-- shared memory counter, used for kafka polling using local shared_data = ngx.shared.shared_datalocal pollingVal = shared_data:get (POLLING_KEY) if not pollingVal then pollingVal = 1 shared_data:set (POLLING_KEY, pollingVal) end-- to get each message, take the remainder of PARTITION_NUM, balanced partition local partitions =''.. (tonumber (pollingVal)% PARTITION_NUM) shared_data:incr (POLLING_KEY, 1)-- concurrency control local isGone = true-- get ngx.var.connections_active for overload protection That is, if the number of current active connections exceeds the threshold, current limit protection if tonumber (ngx.var.connections_active) > tonumber (DEFAULT_THRESHOLD) then isGone = falseend-- data acquisition if isGone then local time_local = ngx.var.time_local if time_local = = nil then time_local = "" end local request = ngx.var.request if request = = nil then request = "" end local request_method = ngx.var.request_method if request_method = = nil then Request_method = "" end local content_type = ngx.var.content_type if content_type = = nil then content_type = "" end ngx.req.read_body () local request_body = ngx.var.request_body if request_body = = nil then request_body = "" end local http_referer = ngx.var.http_referer if http_referer = = nil then http_referer = "" end local remote_addr = ngx.var.remote_addr if remote_addr = = nil Then remote_addr = "" end local http_user_agent = ngx.var.http_user_agent if http_user_agent = = nil then http_user_agent = "" end local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 = = nil then time_iso8601 = "" end local server_addr = ngx.var.server_addr if server_addr = = nil then server_addr = "" end local http_cookie = ngx.var.http_cookie if http_cookie = = nil Then http_cookie = "" end-- encapsulated data local message = time_local. "# CS#".. Request. "# CS#".. Request_method. "# CS#".. Content_type. "# CS#".. Request_body. "# CS#".. Http_referer. "# CS#".. Remote_addr. "# CS#".. Http_user_agent. "# CS#".. Time_iso8601. "# CS#".. Server_addr. "# CS#".. Http_cookie;-- introduces producerlocal producer = require "resty.kafka.producer" of kafka-create producerlocal bp = producer:new (BROKER_LIST, CONNECT_PARAMS)-send data local ok, err = bp:send (TOPIC, partitions, message)-print error log if not ok then ngx.log (ngx.ERR, "kafka send err:", err) return endend

Step 3: modify the nginx configuration file nginx.conf

1. Enter the ngin/conf directory

[root@node03 openresty] # cd / export/servers/openresty/nginx/conf/ [root@node03 conf] # lltotal 76 fastcgi.conf-rw-r--r---1 root root 1077 Jul 26 11:33 fastcgi.conf-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default-rw-r -- root root-1 root root 2837 Jul 26 11:33 koi-utf-rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default-rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf-rw-r--r-- 1 root root 2656 Jul 26 11: 33 nginx.conf.default-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default-rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf

two。 Modify nginx.conf

[root@node03 conf] # vim nginx.conf # 1. It means to find the first server # 2. Add two lines of code to server as follows: # 3. Add kafka-related code to server as follows #-added code-- # Open the shared dictionary and set the memory size to 10m for each nginx thread to consume lua_shared_dict shared_data 10m # configure resolver 127.0.0.1 for local domain name resolution-added code-server {listen 80; server_name localhost; # charset koi8-r; # access_log logs/host.access.log main; location / {root html Index index.html index.htm;} #-added code-- location / kafkalua {# where kafkalua is the project name, without default # enable nginx monitoring stub_status on # load the lua file default_type text/html; # specify the lua file location of the kafka, which is the kafkalua.lua we just created (what we emphasized earlier to remember! ) content_by_lua_file / export/servers/openresty/testlua/kafkalua.lua;} #-added code--}

Description: location / kafkalua {.} the kafkalua here is the project name, you can take it at will or not, but you must remember it!

We see that we have two location configured above, the first is location / {.} and the second is location / kafkalua {.} so what's the difference between them? Look down first, and the fog will slowly unfold.

Step 4: start nginx

1. Enter nginx/sbin

[root@node03 sbin] # cd / export/servers/openresty/nginx/sbin/ [root@node03 sbin] # lltotal 16356-rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx

two。 Test whether the configuration file is correct

[root@node03 sbin] # nginx-tnginx: the configuration file / export/servers/openresty/nginx/conf/nginx.conf syntax is oknginx: configuration file / export/servers/openresty/nginx/conf/nginx.conf test is successful# see that it has been successful

3. Start nginx

[root@node03 sbin] # nginx# does not show anything is generally successful

4. Check to see if nginx starts successfully

[root@node03 sbin] # ps-ef | grep nginxroot 3730 1 0 09:24? 00:00:00 nginx: master process nginxnobody 3731 3730 0 09:24? 00:00:20 nginx: worker process is shutting downnobody 5766 3730 0 12:17? 00:00:00 nginx: worker processroot 5824 3708 0 12:24 pts/1 00:00:00 grep nginx# sees two nginx processes, indicating success le

5. Browsers access nginx

Enter: node03/kafkalua in the browser

Description: how to enter the address of the device where the openresty is located without configuring hosts, such as 192.168.52.120/kafkalua

Enter: node03/ or 192.168.52.120 / in the browser

Then type: node03:80/kafkalua and node03:80/ in the browser to try moving to nginx.conf to see:

Node03:80/kafkalua the nide03 here is the alias of the server or the address of the writing server. 80 is the listening port configured by [listen 80;]. Port 80 can be omitted. If this is written as [listen 8088;], then the browser needs to enter node03:8088/kafkalua (8088 cannot be omitted here), and kafkalua is the project name.

Server {listen 80; server_name localhost; # charset koi8-r; # access_log logs/host.access.log main; location / {root html; index index.html index.htm } #-added code-- location / kafkalua {# where kafkalua is the project name, without default empty # enable nginx monitoring stub_status on; # load lua file default_type text/html # specify the lua file location of kafka, which is the kafkalua.lua we just created (what we emphasized earlier to remember! ) content_by_lua_file / export/servers/openresty/testlua/kafkalua.lua;}

Step 5: create a test crawler

1. Create maven project import dependency

Org.jsoup jsoup 1.11.3 org.apache.httpcomponents httpclient 4.5.4

two。 Pseudo crawler program

Public class SpiderGoAirCN {private static String basePath = "http://node03/kafkalua"; public static void main (String [] args) throws Exception {for (int I = 0; I)

< 50000; i++) { // 请求查询信息 spiderQueryao(); // 请求html spiderHtml(); // 请求js spiderJs(); // 请求css spiderCss(); // 请求png spiderPng(); // 请求jpg spiderJpg(); Thread.sleep(100); } } /** * * @throws Exception */ public static void spiderQueryao() throws Exception { // 1.指定目标网站 ^.*/B2C40/query/jaxb/direct/query.ao.*$ String url = basePath + "/B2C40/query/jaxb/direct/query.ao"; // 2.发起请求 HttpPost httpPost = new HttpPost(url); // 3. 设置请求参数 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=" + getGoTime() + "&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.80"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "243.45.78.132"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D" + getGoTime() + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(" + getGoTime() + ")"); // 4.设置请求参数 ArrayList parameters = new ArrayList(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 发起请求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.获取返回值 System.out.println(response != null); } public static void spiderHtml() throws Exception { // 1.指定目标网站 ^.*html.*$ String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0"; // 2.发起请求 HttpPost httpPost = new HttpPost(url); // 3. 设置请求参数 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.设置请求参数 // httpPost.setEntity(new StringEntity( // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember=")); ArrayList parameters = new ArrayList(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 发起请求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.获取返回值 System.out.println(response != null); } public static void spiderJs() throws Exception { // 1.指定目标网站 String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js"; // 2.发起请求 HttpPost httpPost = new HttpPost(url); // 3. 设置请求参数 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.设置请求参数 ArrayList parameters = new ArrayList(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 发起请求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.获取返回值 System.out.println(response != null); } public static void spiderCss() throws Exception { // 1.指定目标网站 String url = basePath +"/B2C40/dist/main/css/flight.css"; // 2.发起请求 HttpPost httpPost = new HttpPost(url); // 3. 设置请求参数 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader("Referer", "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.设置请求参数 ArrayList parameters = new ArrayList(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 发起请求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.获取返回值 System.out.println(response != null); } public static void spiderPng() throws Exception { // 1.指定目标网站 String url =basePath + "/B2C40/dist/main/images/common.png"; // 2.发起请求 HttpPost httpPost = new HttpPost(url); // 3. 设置请求参数 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.设置请求参数 ArrayList parameters = new ArrayList(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 发起请求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.获取返回值 System.out.println(response != null); } public static void spiderJpg() throws Exception { // 1.指定目标网站 String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg"; // 2.发起请求 HttpPost httpPost = new HttpPost(url); // 3. 设置请求参数 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.设置请求参数 ArrayList parameters = new ArrayList(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 发起请求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.获取返回值 System.out.println(response != null); } public static String getLocalDateTime() { DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00", Locale.ENGLISH); String nowAsISO = df.format(new Date()); return nowAsISO; } public static String getISO8601Timestamp() { DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00"); String nowAsISO = df.format(new Date()); return nowAsISO; } public static String getGoTime() { DateFormat df = new SimpleDateFormat("yyyy-MM-dd"); String nowAsISO = df.format(new Date()); return nowAsISO; } public static String getBackTime() { Date date = new Date();// 取时间 Calendar calendar = new GregorianCalendar(); calendar.setTime(date); calendar.add(calendar.DATE, +1);// 把日期往前减少一天,若想把日期向后推一天则将负数改为正数 date = calendar.getTime(); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); String dateString = formatter.format(date); return dateString; }} 第六步:启动kafka 1.创建主题topic [root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3 --replication-factor 3 --create --topic B2CDATA_COLLECTION1 2.开启kafka消费者 [root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic B2CDATA_COLLECTION1 第七步:开启爬虫程序并观察结果 1.启动爬虫程序 2.观察消费者窗口如下

Step 8: start kafka-manager observation

1. Start kafka-manager

[root@node01 conf] # cd / export/servers/kafka-manager-1.3.3.23/bin/ [root@node01 bin] # lltotal 36-rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager-rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat-rwxr-xr-x 1 root root 1383 May 1 06:27 log-config-rw-r--r-- 1 root root 105 May 1 06:27 log-config.bat [root @ node01 bin] # # launch [root@node01 bin] #. / kafka-manager

The window after startup:

two。 Browser access

Browser input: node01:9000

Kafka manager use without explanation, observe the consumption of B2CDATA_COLLECTION1 theme:

There are three partitions, and the number of messages consumed by each partition indicates that it is successful.

If not, there is no partition policy configured in the kafkalua.lua script, and the default partition will cause data skew. We need to configure our own partition policy!

The above is how to use kafka in nginx lua. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report