Disruptor-NET和内存栅栏

Disruptor-NET算法(是一种无锁算法)需要我们自己实现某一种特定的内存操作的语义以保证算法的正确性。这时我们就需要显式的使用一些指令来控制内存操作指令的顺序以及其可见性定义。这种指令称为内存栅栏。

成都创新互联服务项目包括丰镇网站建设、丰镇网站制作、丰镇网页制作以及丰镇网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,丰镇网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到丰镇省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!

内存一致性模型需要在各种的程序与系统的各个层次上定义内存访问的行为。在机器码与的层次上,其定义将影响硬件的设计者以及机器码开发人员;而在高级语言层次上,其定义将影响高级语言开发人员以及编译器开发人员和硬件设计人员。即,内存操作的乱序在各个层次都是存在的。这里,所谓的程序的执行顺序有三种:

(1)程序顺序:指在特定CPU上运行的,执行内存操作的代码的顺序。这指的是编译好的程序二进制镜像中的指令的顺序。编译器并不一定严格按照程序的顺序进行二进制代码的编排。编译器可以按照既定的规则,在执行代码优化的时候打乱指令的执行顺序,也就是上面说的程序顺序。并且,编译器可以根据程序的特定行为进行性能优化,这种优化可能改变算法的形式与算法的执行复杂度。(例如将switch转化为表驱动序列)
(2)执行顺序:指在CPU上执行的独立的内存相关的代码执行的顺序。执行顺序和程序顺序可能不同,这种不同是编译器和CPU优化造成的结果。CPU在执行期(Runtime)根据自己的内存模型(跟编译器无关)打乱已经编译好了的指令的顺序,以达到程序的优化和最大限度的资源利用。

(3)感知顺序:指特定的CPU感知到他自身的或者其他CPU对内存进行操作的顺序。感知顺序和执行顺序可能还不一样。这是由于缓存优化或者内存优化系统造成的。

而最终的共享内存模型的表现形式是由这三种“顺序”共同决定的。即从源代码到最终执行进行了至少三个层次上的代码顺序调整,分别是由编译器和CPU完成的。我们上面提到,这种代码执行顺序的改变虽然在单线程程序中不会引发副作用,但是在多线程程序中,这种作用是不能够被忽略的,甚至可能造成完全错误的结果。因此,在多线程程序中,我们有时需要人为的限制内存执行的顺序。而这种限制是通过不同层次的内存栅栏完成的。

Thread.MemoryBarrier就是采用了CPU提供的某些特定的指令的内存栅栏,下面是msdn的解释【http://msdn.microsoft.com/zh-cn/library/vstudio/system.threading.thread.memorybarrier(v=vs.100).aspx】:

Thread.MemoryBarrier: 按如下方式同步内存访问:执行当前线程的处理器在对指令重新排序时,不能采用先执行 MemoryBarrier调用之后的内存访问,再执行 MemoryBarrier调用之前的内存访问的方式。

按照我个人的理解:就是写完数据之后,调用MemoryBarrier,数据就会立即刷新,另外在读取数据之前调用MemoryBarrier可以确保读取的数据是最新的,并且处理器对MemoryBarrier的优化小心处理。

        int _answer;
        bool _complete;

        void A()
        {
            _answer = 123;
            Thread.MemoryBarrier(); //在写完之后,创建内存栅栏
            _complete = true;
            Thread.MemoryBarrier();//在写完之后,创建内存栅栏
       }

        void B()
        {
            Thread.MemoryBarrier();//在读取之前,创建内存栅栏
            if (_complete)
            {
                Thread.MemoryBarrier();//在读取之前,创建内存栅栏
                Console.WriteLine(_answer);
            }
        }

Disruptor-NET正是通过Thread.MemoryBarrier 实现无锁和线程安全的内存操作,看下面是他的Atomic的Volatile类对常用数据类型的封装

Disruptor-NET和内存栅栏

        /// 
        /// An integer value that may be updated atomically
        /// 
        public struct Integer
        {
            private int _value;

            /// 
            /// Create a new  with the given initial value.
            /// 
            /// Initial value
            public Integer(int value)
            {
                _value = value;
            }

            /// 
            /// Read the value without applying any fence
            /// 
            /// The current value
            public int ReadUnfenced()
            {
                return _value;
            }

            /// 
            /// Read the value applying acquire fence semantic
            /// 
            /// The current value
            public int ReadAcquireFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }

            /// 
            /// Read the value applying full fence semantic
            /// 
            /// The current value
            public int ReadFullFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }

            /// 
            /// Read the value applying a compiler only fence, no CPU fence is applied
            /// 
            /// The current value
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public int ReadCompilerOnlyFence()
            {
                return _value;
            }

            /// 
            /// Write the value applying release fence semantic
            /// 
            /// The new value
            public void WriteReleaseFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }

            /// 
            /// Write the value applying full fence semantic
            /// 
            /// The new value
            public void WriteFullFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }

            /// 
            /// Write the value applying a compiler fence only, no CPU fence is applied
            /// 
            /// The new value
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public void WriteCompilerOnlyFence(int newValue)
            {
                _value = newValue;
            }

            /// 
            /// Write without applying any fence
            /// 
            /// The new value
            public void WriteUnfenced(int newValue)
            {
                _value = newValue;
            }

            /// 
            /// Atomically set the value to the given updated value if the current value equals the comparand
            /// 
            /// The new value
            /// The comparand (expected value)
            /// 
            public bool AtomicCompareExchange(int newValue, int comparand)
            {
                return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand;
            }

            /// 
            /// Atomically set the value to the given updated value
            /// 
            /// The new value
            /// The original value
            public int AtomicExchange(int newValue)
            {
                return Interlocked.Exchange(ref _value, newValue);
            }

            /// 
            /// Atomically add the given value to the current value and return the sum
            /// 
            /// The value to be added
            /// The sum of the current value and the given value
            public int AtomicAddAndGet(int delta)
            {
                return Interlocked.Add(ref _value, delta);
            }

            /// 
            /// Atomically increment the current value and return the new value
            /// 
            /// The incremented value.
            public int AtomicIncrementAndGet()
            {
                return Interlocked.Increment(ref _value);
            }

            /// 
            /// Atomically increment the current value and return the new value
            /// 
            /// The decremented value.
            public int AtomicDecrementAndGet()
            {
                return Interlocked.Decrement(ref _value);
            }

            /// 
            /// Returns the string representation of the current value.
            /// 
            /// the string representation of the current value.
            public override string ToString()
            {
                var value = ReadFullFence();
                return value.ToString();
            }
        }

 

深入浅出多线程系列之八:内存栅栏和volatile 关键字


分享文章:Disruptor-NET和内存栅栏
本文来源:http://pcwzsj.com/article/pdiiio.html