In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Today, I will talk to you about how to deal with the problems encountered when inserting and querying massive data in Spark-Redis. Many people may not know much about it. In order to make you understand better, the editor has summarized the following contents for you. I hope you can get something according to this article.
Because redis is a memory-based database, its stability is not very high, especially redis in standalone mode. Therefore, we will encounter a lot of problems when using Spark-Redis in our work, especially in the scene of massive data insertion and query.
Massive data query
Redis is a database that is read in memory, and Redis can be read faster than other databases. But when we have to query tens of millions of pieces of data, even Redis takes a long time. At this point, if we want to terminate the execution of the select job, we want all the running task to killed immediately.
Spark has a job scheduling mechanism. SparkContext is the entrance to Spark, which is equivalent to the main function of the application. The cancelJobGroup function in SparkContext cancels the running job.
/ * Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup` * for more information. * / def cancelJobGroup (groupId: String) {assertNotStopped () dagScheduler.cancelJobGroup (groupId)}
In theory, after canceling job, all task under job should also be terminated. And when we cancel the select job, the executor will throw TaskKilledException, and at this time the TaskContext in charge of the task job will execute killTaskIfInterrupted after catching the exception.
/ / If this task has been killed before we deserialized it, let's quit now. Otherwise, / / continue executing the task. Val killReason = reasonIfKilled if (killReason.isDefined) {/ / Throw an exception rather than returning, because returning within a try {} block / / causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl / / exception will be caught by the catch block, leading to an incorrect ExceptionFailure / / for the task. Throw new TaskKilledException (killReason.get)} / * If the task is interrupted, throws TaskKilledException with the reason for the interrupt. * / private [spark] def killTaskIfInterrupted (): Unit
However, the termination job still appears in Spark-Redis, but the task is still running. Because the computing logic of task is ultimately implemented in RedisRDD, RedisRDD's compute fetches the keys from Jedis. So to solve this problem, you should cancel the task that is running in RedisRDD. There are two ways:
Method 1: refer to the JDBCRDD of Spark, define close (), combined with InterruptibleIterator. Def close () {if (closed) return try {if (null! = rs) {rs.close ()} catch {case e: Exception = > logWarning ("Exception closing resultset", e)} try {if (null! = stmt) {stmt.close ()} catch {case e: Exception = > logWarning ("Exception closing statement") E)} try {if (null! = conn) {if (! conn.isClosed & &! conn.getAutoCommit) {try {conn.commit ()} catch {case NonFatal (e) = > logWarning ("Exception committing transaction") E)} conn.close ()} logInfo ("closed connection")} catch {case e: Exception = > logWarning ("Exception closing connection", e)} closed = true} context.addTaskCompletionListener {context = > close ()} CompletionIterator [InternalRow, Iterator [InternalRow] (new InterruptibleIterator (context, rowsIterator), close ()) method 2: asynchronous thread executes compute Judge task isInterruptedtry {val thread = new Thread () {override def run (): Unit = {try {keys = doCall} catch {case e = > logWarning (s "execute http require failed.")} isRequestFinished = true}} / / control the http request for quite if user interrupt the job thread.start () while (! context.isInterrupted () & &! isRequestFinished) {Thread.sleep (GetKeysWaitInterval)} if (context.isInterrupted () &! isRequestFinished) {logInfo (s "try to kill task ${context.getKillReason ()}") context.killTaskIfInterrupted ()} thread.join () CompletionIterator [T Iterator [T]] (new InterruptibleIterator (context, keys), close)
We can execute the compute asynchronously, then determine whether it is task isInterrupted in another thread, and if so, execute the killTaskIfInterrupted of the TaskContext. Prevent killTaskIfInterrupted from killing task, combined with InterruptibleIterator: an iterator to provide task termination. Work by checking the interrupt flag in [TaskContext].
Massive data insertion
We all have redis data stored in memory. Of course, Redis also supports persistence and can back up the data to the hard disk. When inserting large amounts of data, it is obvious that some of the data will be lost if Redis does not have enough memory. The point that puzzles users here is that when Redis's used memory is greater than the maximum available memory, Redis will report an error: command not allowed when used memory > 'maxmemory'. But when the data in insert job is larger than the available memory in Redis, some of the data is lost and no error has been reported.
Because either the Jedis client or the Redis server does not have enough memory when inserting data, it will not be inserted successfully, but it will not return any response. So the solution that can be thought of at present is to expand insert memory when Redis data is lost.
Spark-Redis is an open source project that is not widely used and is not commercialized like Spark JDBC. So Spark-Redis still has a lot of problems. I believe that with the efforts of commiter, Spark-Redis will become more and more powerful.
After reading the above, do you have any further understanding of how to deal with the problems encountered in Spark-Redis massive data insertion and query operations? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.