In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces how to achieve index de-duplication in nutch, which is very detailed and has a certain reference value. Friends who are interested must read it!
First, the main program calls
SolrDeleteDuplicates dedup = new SolrDeleteDuplicates ()
Dedup.setConf (getConf ())
Dedup.dedup (solrUrl)
II. Job task configuration
JobConf job = new NutchJob (getConf ())
Job.setInputFormat (SolrInputFormat.class)
Job.setMapperClass (IdentityMapper.class)
Job.setMapOutputKeyClass (Text.class)
Job.setMapOutputValueClass (SolrRecord.class)
Job.setReducerClass (SolrDeleteDuplicates.class)
Job.setOutputFormat (NullOutputFormat.class)
JobClient.runJob (job)
III. Input and output of Map and reduce tasks
Map task input, output
Public void map (
K key, V val
OutputCollector output
Reduce task input, output
Input: Text/Iterator
Output: Text/SolrRecord
Public void reduce (
Text key, Iterator values
OutputCollector output
4. Job task input class SolrInputFormat
The getSplits method splits all documents equally according to quantity.
In the getRecordReader method, solrserver is used to query all the doc records contained in the current fragment, solrrecord returns the current RecordReader record (RecordReader is a global variable), and there is a method to get the next one.
(1). GetSplits method of SolrInputFormat
1. Get the solrserver object according to the parameters of the job object.
2. Build and execute the query (query parameters: [*: *, id, setRow (1)]) to get the response object
3. Get the total number of indexes according to the response object, divided by the number of fragments, and get how many indexes are allocated for each slice.
4. Create SolrInputSplit array objects according to the number of fragments
5. Instantiate the SolrInputSplit object according to the start and end position of the solr input shard
Public InputSplit [] getSplits (JobConf job, int numSplits) throws IOException {
SolrServer solr = SolrUtils.getCommonsHttpSolrServer (job)
Final SolrQuery solrQuery = new SolrQuery (SOLR_GET_ALL_QUERY)
SolrQuery.setFields (SolrConstants.ID_FIELD)
SolrQuery.setRows (1)
QueryResponse response
Try {
Response = solr.query (solrQuery)
} catch (final SolrServerException e) {
Throw new IOException (e)
}
Int numResults = (int) response.getResults () .getNumFound ()
Int numDocsPerSplit = (numResults / numSplits)
Int currentDoc = 0
SolrInputSplit [] splits = new SolrInputSplit [numSplits]
For (int I = 0; I
< numSplits - 1; i++) { splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit); currentDoc += numDocsPerSplit; } splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc); return splits; } (2)、SolrInputFormat的getRecordReader()方法 1、获取solrserver对象 2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数 3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest, SolrInputSplit中的开始位置,文档总数 ])。 4、根据响应对象,获取结果集 5、对匿名内部内RecordReader做了实现,并且返回 public RecordReader getRecordReader(final InputSplit split, final JobConf job, Reporter reporter) throws IOException { //1、获取solrserver对象 SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job); //2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数 SolrInputSplit solrSplit = (SolrInputSplit) split; final int numDocs = solrSplit.getNumDocs(); //3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest, SolrInputSplit中的开始位置,文档总数 ]) SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD, SolrConstants.TIMESTAMP_FIELD, SolrConstants.DIGEST_FIELD); solrQuery.setStart(solrSplit.getDocBegin()); solrQuery.setRows(numDocs); QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } //4、根据响应对象,获取结果集 final SolrDocumentList solrDocs = response.getResults(); return new RecordReader() { //当前的文档 private int currentDoc = 0; public void close() throws IOException { } public Text createKey() { return new Text(); } public SolrRecord createValue() { return new SolrRecord(); } //获取当前的指针 public long getPos() throws IOException { return currentDoc; } //获取进度 public float getProgress() throws IOException { return currentDoc / (float) numDocs; } //获取下一个 public boolean next(Text key, SolrRecord value) throws IOException { if (currentDoc >= numDocs) {
Return false
}
/ /
SolrDocument doc = solrDocs.get (currentDoc)
/ / get the summary
String digest = (String) doc.getFieldValue (SolrConstants.DIGEST_FIELD)
/ / use the summary as a key
Key.set (digest)
/ / value (SolrRecord)
/ / assignment: assign values to the three id,tstamp,boost fields of solrrecord through doc
Value.readSolrDocument (doc)
/ / pointer plus self-increment 1
CurrentDoc++
Return true
}
}
}
Fifth, the implementation of map () method and reduce () method.
(1), map task
(2), reduce task
Remove the logic:
The reduce task iterates through each record and executes the code in the reduce () method
In the reduce () method, all documents after the current document are traversed. If the score and time are smaller than the current one, solrj is called to delete the document. If it is larger than the current one, it deletes the current document and replaces the current one with the larger one.
Public void reduce (Text key, Iterator values
OutputCollector output, Reporter reporter)
Throws IOException {
/ / 1. Next SolrRecord object
SolrRecord recordToKeep = new SolrRecord (values.next ())
/ / 2. Traversed SolrRecord
While (values.hasNext ()) {
/ /
SolrRecord solrRecord = values.next ()
/ / boost and tstamp participate in the comparison
/ / if the current score is higher than the held branch and the time is newer than the held branch, delete the index according to id
If (solrRecord.getBoost () > recordToKeep.getBoost ()) | |
(solrRecord.getBoost () = = recordToKeep.getBoost () & &
SolrRecord.getTstamp () > recordToKeep.getTstamp ()) {
UpdateRequest.deleteById (recordToKeep.id)
RecordToKeep = new SolrRecord (solrRecord)
} else {
UpdateRequest.deleteById (solrRecord.id)
}
NumDeletes++
Reporter.incrCounter ("SolrDedupStatus", "Deleted documents", 1)
If (numDeletes > = NUM_MAX_DELETE_REQUEST) {
Try {
LOG.info ("SolrDeleteDuplicates: deleting" + numDeletes + "duplicates")
UpdateRequest.process (solr)
} catch (SolrServerException e) {
Throw new IOException (e)
}
UpdateRequest = new UpdateRequest ()
NumDeletes = 0
}
}
}
VI. About digest
The digest field in doc is added to the reduce method in the IndexerMapReduce class
/ / add digest, used by dedup
Doc.add ("digest", metadata.get (Nutch.SIGNATURE_KEY))
Metadata contains a HashMap
Final Metadata metadata = parseData.getContentMeta ()
The above is all the contents of the article "how to remove duplicates in nutch". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.