In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
How to make the inter-process synchronous communication of the C# framework, in view of this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.
Download threadmsg_demo.zip ~ 41KB
Download threadmsg_src.zip ~ 65KB
0. Background introduction
Microsoft provides a variety of practical thread synchronization methods in the .NET Framework, including monitor classes and reader-writer locks. However, cross-process synchronization methods are still very lacking. In addition, there is no convenient way to transfer messages between threads and processes. For example, CCompact S and SOA, or the producer / consumer model, often need to deliver messages. To this end, I wrote an independent and complete framework to achieve cross-thread and cross-process synchronization and communication. This framework includes semaphores, mailboxes, memory-mapped files, blocking channels, and simple message flow controllers. The classes mentioned in the article belong to the same open source library project (BSD license), which you can download to www.cdrnet.net/projects/threadmsg/. from here.
The purpose of this framework is:
1. Encapsulation: the thread that sends the message through the MSMQ message queue does not care whether the message is sent to another thread or to another machine.
two。 Simplicity: only one method is called to send a message to other processes.
Note: I have removed XML comments from all the code in this article to save space. If you want to know more about these methods and parameters, please refer to the code in the attachment.
1. Let's start with a simple example.
With this library, cross-process messaging becomes very simple. I will use a small example as an example: a console program that can be run as a sender or receiver depending on the parameters. In the sending program, you can enter certain text and send it to the mailbox (return key), and the receiving program will display all messages received from the mailbox. You can run countless sender and receiver programs, but each message will only be received by a specific receiver.
[Serializable] struct Message {public string Text;} class Test {IMailBox mail; public Test () {mail = new ProcessMailBox ("TMProcessTest", 1024);} public void RunWriter () {Console.WriteLine ("Writer started"); Message msg; while (true) {msg.Text = Console.ReadLine (); if (msg.Text.Equals ("exit)) break; mail.Content = msg } public void RunReader () {Console.WriteLine ("Reader started"); while (true) {Message msg = (Message) mail.Content; Console.WriteLine (msg.Text);}} [STAThread] static void Main (string [] args) {Test test = new Test (); if (args.Length > 0) test.RunWriter (); else test.RunReader () }}
Once the mailbox is created (ProcessMailBox in the above code), you only need to read the Content property to receive the message, and you only need to assign a value to it to send the message. When there is no data, getting the message will block the current thread; if there is already data in the mailbox when sending the message, it will block the current thread. With this blocking, the whole program is based entirely on interrupts and does not overconsume CPU (no polling is required). Messages sent and received can be any type that supports serialization (Serializable).
However, what is actually happening secretly is a bit complicated: messages are passed through memory-mapped files, which is currently the only way to share memory across processes, and in this case we will only generate virtual files in pagefile. Access to this virtual file is ensured by win32 semaphores. The message is first serialized into binary and then written to the file, which is why the Serializable attribute needs to be declared. Both memory-mapped files and win32 semaphores need to call NT kernel methods. With more Marshal classes in the .NET Framework, we can avoid writing unsafe code. We will discuss more details below.
2. Cross-thread / process synchronization in .NET
Thread / process communication requires shared memory or other built-in mechanisms to send / receive data. Even with shared memory, a set of synchronization methods are needed to allow concurrent access.
All threads within the same process share a common logical address space (heap). For different processes, memory cannot be shared since win2000. However, different processes can read and write the same file. WinAPI provides a variety of system call methods to map files to the logical space of the process, and to access virtual files in the pagefile pointed to by the system kernel object (session). Whether it is a shared heap or a shared file, concurrent access can lead to data inconsistencies. Let's briefly discuss how to ensure the order of thread / process calls and data consistency.
2.1 Thread synchronization
The .NET Framework and C # provide convenient and intuitive thread synchronization methods, namely monitor classes and lock statements (the mutexes of the .NET Framework are not discussed in this article). For thread synchronization, although this article provides other methods, we recommend using the lock statement.
Void Work1 () {NonCriticalSection1 (); Monitor.Enter (this); try {CriticalSection ();} finally {Monitor.Exit (this);} NonCriticalSection2 ();}
Work1 and Work2 are equivalent. In C#, many people like the second method because it is shorter and less error-prone.
2.2 Cross-thread semaphores
Semaphore is one of the classical basic concepts of synchronization (introduced by Edsger Dijkstra). A semaphore is an object with a counter and two operations. Its two operations are: get (also called P or wait) and release (also called V or receive a signal). The semaphore blocks if the counter is 0 during the get operation, otherwise the counter is subtracted by one; when released, the counter is incremented by one without blocking. Although the principle of semaphores is simple, it is a bit troublesome to implement. Fortunately, the built-in monitor class has blocking features and can be used to implement semaphores.
Public sealed class ThreadSemaphore: ISemaphore {private int counter; private readonly int max; public ThreadSemaphore (): this (0, int.Max) {} public ThreadSemaphore (int initial): this (initial, int.Max) {} public ThreadSemaphore (int initial, int max) {this.counter = Math.Min (initial,max); this.max = max;} public void Acquire () {lock (this) {counter--; if (counter)
< 0 && !Monitor.Wait(this)) throw new SemaphoreFailedException(); } } public void Acquire(TimeSpan timeout) { lock(this) { counter--; if(counter < 0 && !Monitor.Wait(this,timeout)) throw new SemaphoreFailedException(); } } public void Release() { lock(this) { if(counter >= max) throw new SemaphoreFailedException (); if (counter < 0) Monitor.Pulse (this); counter++;}
Semaphores are more useful in complex blocking scenarios, such as the channel (channel) that we will discuss later. You can also use semaphores to achieve the exclusivity of critical areas (such as Work3 below), but I still recommend using built-in lock statements, like Work2 above.
Please note that semaphores can also be potentially dangerous if used improperly. The right thing to do is: when getting the semaphore fails, never call the release operation again; when the acquisition is successful, remember to release the semaphore no matter what error occurs. Follow this principle and your synchronization is correct. The finally statement in Work3 is to ensure that semaphores are released correctly. Note: the operation to get the semaphore (s.Acquire ()) must be placed outside the try statement so that the release operation will not be called when the fetch fails.
ThreadSemaphore s = new ThreadSemaphore (1); void Work3 () {NonCriticalSection1 (); s.Acquire (); try {CriticalSection ();} finally {s.Release ();} NonCriticalSection2 ();}
2.3 Cross-process semaphores
In order to coordinate access to the same resource by different processes, we need to use the concepts discussed above. Unfortunately, the monitor class in the. NET cannot be used across processes. However, the kernel semaphore objects provided by win32 API can be used to achieve cross-process synchronization. Robin Galloway-Lunn describes how to map semaphores from win32 to. Net (see Using Win32 Semaphores in C #). Our implementation is similar:
[DllImport ("kernel32", EntryPoint= "CreateSemaphore", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint CreateSemaphore (SecurityAttributes auth, int initialCount, int maximumCount, string name); [DllImport ("kernel32", EntryPoint= "WaitForSingleObject", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint WaitForSingleObject (uint hHandle, uint dwMilliseconds); [DllImport ("kernel32", EntryPoint= "ReleaseSemaphore", SetLastError=true,CharSet=CharSet.Unicode)] [return: MarshalAs (UnmanagedType.VariantBool)] internal static extern bool ReleaseSemaphore (uint hHandle, int lReleaseCount, out int lpPreviousCount) [DllImport ("kernel32", EntryPoint= "CloseHandle", SetLastError=true, CharSet=CharSet.Unicode)] [return: MarshalAs (UnmanagedType.VariantBool)] internal static extern bool CloseHandle (uint hHandle); public class ProcessSemaphore: ISemaphore, IDisposable {private uint handle; private readonly uint interruptReactionTime Public ProcessSemaphore (string name): this (name,0,int.MaxValue,500) {} public ProcessSemaphore (string name, int initial): this (name,initial,int.MaxValue,500) {} public ProcessSemaphore (string name, int initial,int max, int interruptReactionTime) {this.interruptReactionTime = (uint) interruptReactionTime; this.handle = NTKernel.CreateSemaphore (null, initial, max, name); if (handle = = 0) throw new SemaphoreFailedException () } public void Acquire () {while (true) {/ / looped 0.5s timeout to make NT-blocked threads interruptable. Uint res = NTKernel.WaitForSingleObject (handle, interruptReactionTime); try {System.Threading.Thread.Sleep (0);} catch (System.Threading.ThreadInterruptedException e) {if (res = = 0) {/ / Rollback int previousCount; NTKernel.ReleaseSemaphore (handle,1,out previousCount);} throw e;} if (res = = 0) return If (res!) throw new SemaphoreFailedException ();}} public void Acquire (TimeSpan timeout) {uint milliseconds = (uint) timeout.TotalMilliseconds; if (NTKernel.WaitForSingleObject (handle, milliseconds)! = 0) throw new SemaphoreFailedException ();} public void Release () {int previousCount; if (! NTKernel.ReleaseSemaphore (handle, 1, out previousCount)) throw new SemaphoreFailedException () } # region IDisposable Member public void Dispose () {if (handle! = 0) {if (NTKernel.CloseHandle (handle)) handle = 0;}} # endregion}
One thing is important: semaphores in win32 can be named. This allows other processes to create handles to the corresponding semaphore by name. To make the blocking thread interruptible, we used a (bad) alternative: timeout and Sleep (0). We need interrupts to safely shut down the thread. It is better to make sure that there is no thread blocking before releasing the semaphore so that the program can fully release resources and exit correctly.
You may have noticed that semaphores across threads and processes use the same interface. All related classes use this pattern to achieve the closeness mentioned in the background introduction above. Note: for performance reasons, you should not use cross-process semaphores in cross-thread scenarios, nor should cross-thread implementations be used in single-thread scenarios.
3. Shared memory across processes: memory mapped fil
We have implemented the synchronization of shared resource access across threads and processes. But delivering / receiving messages also requires sharing resources. For threads, you only need to declare a class member variable. But for cross-processes, we need to use the memory-mapped file (Memory Mapped Files, or MMF) provided by win32 API. Using MMF is similar to using win32 semaphores. We need to first call the CreateFileMapping method to create a handle to the memory-mapped file:
[DllImport ("Kernel32.dll", EntryPoint= "CreateFileMapping", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr CreateFileMapping (uint hFile, SecurityAttributes lpAttributes, uint flProtect, uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName); [DllImport ("Kernel32.dll", EntryPoint= "MapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr MapViewOfFile (IntPtr hFileMappingObject, uint dwDesiredAccess, uint dwFileOffsetHigh, uint dwFileOffsetLow, uint dwNumberOfBytesToMap) [DllImport ("Kernel32.dll", EntryPoint= "UnmapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)] [return: MarshalAs (UnmanagedType.VariantBool)] internal static extern bool UnmapViewOfFile (IntPtr lpBaseAddress); public static MemoryMappedFile CreateFile (string name, FileAccess access, int size) {if (size < 0) throw new ArgumentException ("Size must not be negative", "size"); IntPtr fileMapping = NTKernel.CreateFileMapping (0xFFFFFFFFFFujnoll, (uint) access,0, (uint) size,name) If (fileMapping = = IntPtr.Zero) throw new MemoryMappingFailedException (); return new MemoryMappedFile (fileMapping,size,access);}
We want to use virtual files in pagefile directly, so we use-1 (0xFFFFFFFF) as the file handle to create our memory-mapped file handle. We also specify the required file size and the corresponding name. This allows other processes to access the mapping file at the same time by this name. After creating a memory-mapped file, we can map different parts of the file (specified by offset and byte size) to our process address space. We specify through the MapViewOfFile system method:
Public MemoryMappedFileView CreateView (int offset, int size, MemoryMappedFileView.ViewAccess access) {if (this.access = = FileAccess.ReadOnly & & access = = MemoryMappedFileView.ViewAccess.ReadWrite) throw new ArgumentException ("Only read access to views allowed on files without write access", "access"); if (offset < 0) throw new ArgumentException ("Offset must not be negative", "size"); if (size < 0) throw new ArgumentException ("Size must not be negative", "size") IntPtr mappedView = NTKernel.MapViewOfFile (fileMapping, (uint) access,0, (uint) offset, (uint) size); return new MemoryMappedFileView (mappedView,size,access);}
In unsafe code, we can cast the returned pointer to the type we specify. However, we don't want unsafe code to exist, so we use the Marshal class to read and write our data from it. The offset parameter is used to read and write data from where to start, relative to the address of the specified mapping view.
Public byte ReadByte (int offset) {return Marshal.ReadByte (mappedView,offset);} public void WriteByte (byte data, int offset) {Marshal.WriteByte (mappedView,offset,data);} public int ReadInt32 (int offset) {return Marshal.ReadInt32 (mappedView,offset);} public void WriteInt32 (int data, int offset) {Marshal.WriteInt32 (mappedView,offset,data);} public void ReadBytes (byte [] data, int offset) {for
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.