Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use machine learning model to predict PySpark stream data

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to use machine learning model to predict PySpark stream data. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have some understanding of the relevant knowledge after reading this article.

Overview

Stream data is a new concept in machine learning field.

Learn how to use machine learning models (such as logistic regression) to predict using PySpark convection data

We will introduce the basics of streaming data and Spark streaming, and then go into the implementation section

Introduction

Imagine that every second more than 8500 Weibo messages are sent, more than 900 photos are uploaded to Instagram, more than 4200 Skype calls are made, more than 78000 Google searches occur, and more than 2 million emails are sent (according to real-time Internet statistics).

We are generating data at an unprecedented speed and scale. It's great to work in the field of data science! However, with the emergence of a large amount of data, it is also faced with complex challenges.

The main question is, how do we collect data on this scale? How can we ensure that our machine learning channels continue to produce results after data generation and collection? These are the major challenges facing the industry and the reason why the concept of streaming data is getting more and more attention in organizations.

Increasing the ability to handle streaming data will greatly improve your current data science capabilities. This is a much-needed skill in the industry, and if you can master it, it will help you gain the next role in data science.

Therefore, we will learn what streaming data is, understand the fundamentals of Spark streaming, and then study an industry-related dataset to implement streaming data using Spark.

What is streaming data?

We see the social media data above-- the data we're dealing with is incredible. Can you imagine what it takes to store all this data? This is a complicated process! So before we delve into the Spark aspect of this article, let's take a moment to understand what streaming data really is.

Stream data has no discrete beginning or end. This data is generated from thousands of data sources per second and needs to be processed and analyzed as soon as possible. A considerable amount of streaming data needs to be processed in real time, such as Google search results.

We know that some conclusions are more valuable after events, and they tend to lose value over time. Take the example of a sporting event-- we want to see the conclusions of real-time analysis and real-time statistics so that we can really enjoy the game at that moment, right?

Spark Stream Foundation

Spark stream is an extension of Spark API, which supports scalable and fault-tolerant stream processing for real-time data streams.

Before skipping to the implementation section, let's look at the different components of the Spark flow.

Discrete flow

A discrete stream or data stream represents a continuous data stream. Here, the data stream is either received directly from any source or after we have done some processing on the original data.

The first step in building a streaming application is to define the batch time at which we collect data from the data source. If the batch time is 2 seconds, the data is collected and stored in RDD every 2 seconds. The continuous sequence chain of these RDD is an immutable discrete stream, which Spark can use as a distributed data set.

Think of a typical data science project. In the data preprocessing stage, we need to convert variables, including converting classified variables into numerical variables, deleting outliers and so on. Spark maintains a history of all transformations we define on any data. Therefore, whenever any error occurs, it can retrace the path of the transformation and regenerate the calculation results.

We want the Spark application to run 24 hours x 7, and whenever any failure occurs, we want it to recover as soon as possible. However, when dealing with large-scale data, Spark needs to recalculate all transformations when there are any errors. As you can imagine, it's very expensive.

Caching

Here is one way to meet this challenge. We can temporarily store the results of calculations (caches) to maintain the results of transformations defined on the data. In this way, when any errors occur, we do not have to recalculate these transformations over and over again.

The data flow allows us to keep the stream data in memory. This is helpful when we want to calculate multiple operations on the same data.

Checkpoint (Checkpointing)

It is very useful when we use caching correctly, but it requires a lot of memory. Not everyone has hundreds of machines with 128GB memory to cache everything.

This introduces the concept of checkpoints.

Checkpoints are another technique for saving the results of converted data frames. It stores the state of running applications in any reliable memory (such as HDFS) from time to time. However, it is slower and less flexible than caching.

When we have streaming data, we can use checkpoints. The conversion result depends on the previous conversion result and needs to be retained before it can be used. We also examine metadata information, such as the configuration used to create flow data and the results of a set of DStream (discrete flow) operations, and so on.

Shared variables in stream data

Sometimes we need to define functions such as map, reduce, or filter for Spark applications, which must be executed on multiple clusters. The variables used in this function are copied to each computer (cluster).

Here, each cluster has a different executor, and we need something that can give us the relationship between these variables.

For example, suppose our Spark application runs on 100 different clusters, capturing Instagram images posted by people from different countries. We need a count of specific tags mentioned in their posts.

Now, the executor of each cluster will calculate the results of the data that exists on that cluster. But we need something to help these clusters communicate so that we can get the aggregate results. In Spark, we have some shared variables that can help us overcome this problem.

Accumulator variable

Use cases, such as the number of errors that occurred, the number of blank logs, and the number of requests we received from a particular country, can all be solved using an accumulator.

The executor on each cluster sends the data back to the driver process to update the value of the accumulator variable. The accumulator is only suitable for association and exchange operations. For example, sum and maximum are valid, while mean is not.

Broadcast variable

When we deal with location data, such as the mapping of city names and postal codes, these are fixed variables. Now, if such data is needed every time for a particular transformation on any cluster, we don't need to send a request to the driver because it's too expensive.

Instead, we can store a copy of this data on each cluster. These types of variables are called broadcast variables.

Broadcast variables allow programmers to cache a read-only variable on each machine. Typically, Spark uses an effective broadcast algorithm to automatically assign broadcast variables, but if we have multiple phases of tasks that require the same data, we can also define them.

Emotion Analysis using PySpark Convective data

It's time to start your favorite IDE! Let's write code in this section and understand the stream data in a practical way.

In this section, we will use a real dataset. Our goal is to find hate speech on Twitter. For simplicity, if Twitter is racist or sexist, we say it contains hate speech.

Therefore, the task is to classify racist or sexist tweets with other tweets. We will use training samples from Tweets and label, where label'1' says Tweet is racist / sexist and label'0' says others.

Why is this project relevant to stream processing? Because social media platforms receive huge amounts of streaming data in the form of comments and status updates. This project will help us limit what is published to the public.

You can view the problem statement-practice question: Twitter emotion Analysis (https://datahack.analyticsvidhya.com/contest/practice-problem-twitter-sentiment-analysis/?utm_source=blog&utm_medium=streaming-data-pyspark-machine-learning-model)) in more detail here. Let's get started!

Set up project workflow

Modeling: we will establish a logical regression model channel to classify whether tweet contains hate speech. Here, our focus is not on building a very accurate classification model, but on looking at how to use any model and return the results of streaming data

Initialize the Spark stream context: once we have built the model, we need to define the hostname and port number from which to get the stream data

Stream data: next, we will add the tweets,Spark API of the netcat server from the defined port to receive the data after the specified duration

Predict and return the result: once we receive the tweet text, we pass the data into the machine learning channel we created and return the predicted emotion from the model

Here is a brief description of our workflow:

Data training for establishing Logistic regression Model

We have data about Tweets in the CSV file that maps to tags. We will use the logistic regression model to predict whether tweet contains hate speech. If so, then our model will label the prediction as 1 (otherwise 0).

You can download the dataset and code (https://github.com/lakshay-arora/PySpark/tree/master/spark_streaming) here.

First, we need to define the schema of the CSV file, otherwise Spark will treat the data type of each column as a string. We read the data and check:

# Import required libraries from pyspark import SparkContextfrom pyspark.sql.session import SparkSessionfrom pyspark.streaming import StreamingContextimport pyspark.sql.types as tpfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssemblerfrom pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizerfrom pyspark.ml.classification import LogisticRegressionfrom pyspark.sql import Row# initialization spark sessionsc = SparkContext (appName= "PySparkShell") spark = SparkSession (sc) # define scheme my_schema = tp.StructType ([tp.StructField (name= 'id', dataType= tp.IntegerType ()) Nullable= True), tp.StructField (name= 'label', dataType= tp.IntegerType (), nullable= True), tp.StructField (name=' tweet', dataType= tp.StringType (), nullable= True)]) # read dataset my_data = spark.read.csv ('twitter_sentiments.csv', schema=my_schema) Header=True) # View data my_data.show (5) # output scheme my_data.printSchema ()

Define the machine learning pipeline

Now that we have the data in the Spark data frame, we need to define the different stages of transforming the data and then use it to get the predicted tags from our model.

In the first phase, we will use RegexTokenizer to convert Tweet text into a word list. Then we will remove the deactivated word from the word list and create a word vector. In the final stage, we will use these word vectors to build a logical regression model and get the predicted emotion.

Remember, our focus is not on building a very accurate classification model, but on how to get the results of the stream data in the prediction model.

# definition phase 1: tag tweet text stage_1 = RegexTokenizer (inputCol= 'tweet', outputCol=' tokens', pattern='\\ W') # definition phase 2: delete the stop word stage_2 = StopWordsRemover (inputCol= 'tokens', outputCol=' filtered_words') # definition phase 3: create a word vector stage_3 = Word2Vec (inputCol= 'filtered_words', outputCol=' vector', vectorSize= 100) # definition phase 4: logical regression model model = LogisticRegression (featuresCol= 'vector') LabelCol= 'label') set up our machine learning pipeline

Let's add the stages variable to the Pipeline object and then perform these transformations sequentially. Match the pipeline to the training dataset, and now, whenever we have a new Tweet, we just need to pass it to the pipe object and transform the data to get the prediction:

# set pipeline pipeline = Pipeline (stages= [stage_1, stage_2, stage_3, model]) # fit model pipelineFit = pipeline.fit (my_data) stream data and returned results

Assuming we receive hundreds of comments per second, we want to keep the platform clean by preventing users from posting comments containing hate speech. So, every time we receive a new text, we pass it into the pipeline and get the predicted emotion.

We will define a function get_prediction that removes the blank statement and creates a data box where each line contains a tweet.

Therefore, initialize the Spark stream context and define a batch duration of 3 seconds. This means that we will predict the data received every 3 seconds:

# define a function to calculate emotional def get_prediction (tweet_text): try: # filter to get tweets tweet_text = tweet_text.filter (lambda x: len (x) > 0) with length greater than 0 # create a data box named "tweet" Each row will contain a tweet rowRdd = tweet_text.map (lambda w: Row (tweet=w)) # create a spark data box wordsDataFrame = spark.createDataFrame (rowRdd) # convert the data using pipes Get the predicted emotion pipelineFit.transform (wordsDataFrame). Select ('tweet','prediction'). Show () except: print (' No data') # initialize the flow context ssc = StreamingContext (sc, batchDuration= 3) # create a data flow that will connect to the hostname:port For example, localhost:9991lines = ssc.socketTextStream (sys.argv [1], int (sys.argv [2])) # Segmentation tweet text with a keyword "tweet_APP" In this way, we can recognize a set of words words = lines.flatMap (lambda line: line.split ('TWEET_APP')) # from a tweet to get the expected emotion of the received tweet words.foreachRDD (get_prediction) # start calculating ssc.start () # wait for the end of ssc.awaitTermination ()

Run the program on a terminal and use Netcat (a utility that can be used to send data to a defined hostname and port number). You can start a TCP connection using the following command:

Nc-lk port_number on how to use machine learning model to predict PySpark stream data is shared here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report