
[Task 기반 Programming]

- 병렬 처리 관련 알아두면 좋은 사전지식 ^^;

[Shared Memory] Vs [Distributed Memory]




Multi-core 프로세서는 보통 여러개의 Physical Core를 가지고 있다.
이러한 Multi-core를 활용 하려면 여러 프로세스를 실행 하거나, 또는 프로세스에서 Multi-thread를 실행해야 한다.

여기서 Physical Core(물리적 코어)는 우리가 흔히 알고 있는 컴퓨터에 물리적으로 존재하는 코어이며,

Logical-Core(논리적 코어)를 2개 이상 제공 할 수 있다.
Logical-Core(논리적 코어)를 Hardware Thread(하드웨어 쓰레드) 라고 한다.
보통 Intel Hyper-Threading Technology (HT 또는 HTT)가 적용된 마이크로 프로세서는 물리적 코어 보다 많은Hardware Threads를 제공 한다.

그렇다면 Software-thread란 무엇일까? 그것은 우리가 만든 프로그램이 실행되면서 부터 생성되는 Thread인 것이다.
이것은 한개 혹은 여러개가 생성 될 수 있다.


Physical Core(물리적 코어) : 실제 Cpu에 물리적으로 존재 하는 코어 이다.
Logical-Core(논리적 코어) 또는 Hardware Thread(하드웨어 쓰레드) : 물리적 코어의 성능을 높이기 위해 추상화 한 개념의 Core.(Hyper-Threading)
Software-Thread : 실행중인 프로그램(Process)에서 생성된 Thread

실제 프로그램에서 만들어진 Software-Thread가 OS 스케쥴러에 따라 Hardware-Thread에 할당 되어 실행 되는 것이다.
아래 그림을 보면 이해가 편할 것이다.



NUMA(non-uniform memory access)

TPL(Task-Parallel-Lib) 는 NUMA와 함께 작동 하도록 최적화 되어 있다고 함.
아래의 그림은 NUMA를 그림으로 간략 하게 표현 함.



Data Parallelism

우리는 Task를 직접 생성해서 사용 할 수도 있지만, 아래의 함수를 이용 할 수도 있다.

Parallel.For — 고정된 수의 독립적 For 루프 반복에 대한 병렬 실행을 제공한다.

var loopCnt = 100;
Parallel.For(1, loopCnt, (int i) =>


Parallel.ForEach — 고정된 수의 독립적인 For Each 루프 반복에 대해 병렬 실행.(Partitioner 지원)


Parallel.ForEach(Partitioner.Create(0, 100), range =>


Parallel.Invoke — 독립 작업의 잠재적 병렬 실행을 제공.

먼저 Parallel.Invoke 부터 살펴 보자. 이 기능은 동시에 독립적인 함수를 실행 시킬 때 사용 한다.
하지만 여러 함수를 동시에 실행 시켜도 실행이 완료되는 시점은 다를 수 있다.


static void Main(string[] args)
    () => Method_1(),
    () => Method_2(),
    () => Method_3(),
    () => Method_4());

결과는 아래와 같이 상황에 따라 다르게 발생 할 수 있다.




  1. 4개의 함수를 동시에 실행 시켰을때 가장 긴 시간을 기준으로 반환 하게 되며, 이것으로 인해 나머지 Thread들이
    유휴 상태가 될 수 있다.
  2. 단지 4개의 함수만 실행 되기 때문에 만약 8개의 Hardware-Thread가 존재 한다면, 4개의 쓰레드가 유휴 상태로
    남아 있게 된다.

- 예외 처리

병렬화 된 각 반복에서 호출되는 대리자 내부의 코드가 대리자 내부에서 캡처되지 않은 예외를 throw하면 예외 집합의 일부가 됩니다.
새로운 AggregateException 클래스는이 예외 집합을 처리 함. 
  Parallel.ForEach(partitioner, (range, state) =>
  	throw new Exception("test");
catch (AggregateException ex)
  foreach (Exception innerEx in ex.InnerExceptions)
    // Do something considering the innerEx Exception


- MaxDegreeOfParallelism

    TPL을 사용하면 새 ParallelOptions 클래스의 인스턴스를 만들고 MaxDegreeOfParallelism의 값을 변경하여

    최대 병렬 처리 수준을 지정할 수 있습니다. 

- CancellationToken

    병렬 처리 하는 중 작업을 취소 할 수 있는 옵션.

- TaskScheduler

    특별한 알고리즘을 사용 하지 않는 이상, 수정 할 일이 없음.

Task Parallelism

TPL은 새로운 작업 기반 프로그래밍 모델을 도입하여 저수준, 더 복잡하고 무거운 스레드로 작업하지 않고도 멀티 코어 성능을 애플리케이션 성능으로 변환합니다.
* Thread를 대체 하는 것은 아님.
아래의 그림을 보면 Task와 Thread의 관계를 알 수 있다.



Task 에 대해서 간단하게 정리해 보자.

1. Thread보다 경량화된 구조이며 Overhead가 적게 발생 한다.

2. 새로운 Thread를 만들면 큰 오버헤드가 발생 하지만, Task를 만들면 작업 대기열을 사용하여 적은 오버헤드 발생.

   이미 존재하는 Thread에 작업이 할당 된다.

3. 기본 작업 스케줄러는 Thread Pool 엔진에 의존 한다.


Task 작업 취소 (CancellationTokenSource and CancellationToken)

class Program
        static void Main(string[] args)
            var cts = new System.Threading.CancellationTokenSource();
            var ct = cts.Token;

            var t1 = Task.Factory.StartNew(
                () => CancelTest1(ct), ct);

            var t2 = Task.Factory.StartNew(
                () => CancelTest2(ct), ct);

            Thread.Sleep(1000 * 1);

                Task.WaitAll(new Task[] {t1, t2});

            catch (Exception e)

            if (t1.IsCanceled)
                Console.WriteLine("The task running CancelTest1 was cancelled.");
            if (t2.IsCanceled)
                Console.WriteLine("The task running CancelTest2 was cancelled.");



        private static void CancelTest1(CancellationToken ct)
            for (int i = 0; i < 10; i++)
                Task.Delay(1000 * 1).Wait();
                Console.WriteLine(nameof(CancelTest1) + "  Index " + i);

        private static void CancelTest2(CancellationToken ct)
            for (int j = 0; j < 10; j++)
                Task.Delay(1000 * 1).Wait();
                Console.WriteLine(nameof(CancelTest2) + "  Index " + j);



[실행 결과]

CancelTest2  Index 0
CancelTest1  Index 0
System.AggregateException: 하나 이상의 오류가 발생했습니다. ---> System.Threading.Tasks.TaskCanceledException: 작업이 취소되었습니다.
   --- 내부 예외 스택 추적의 끝 ---
   위치: System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout, CancellationToken cancellationToken)
   위치: System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout)
   위치: System.Threading.Tasks.Task.WaitAll(Task[] tasks)
   위치: ConsoleApp1.Program.Main(String[] args)
---> (내부 예외 #0) System.Threading.Tasks.TaskCanceledException: 작업이 취소되었습니다.<---

---> (내부 예외 #1) System.Threading.Tasks.TaskCanceledException: 작업이 취소되었습니다.<---

The task running CancelTest1 was cancelled.
The task running CancelTest2 was cancelled.


Task Chaning Continuation

프로그램 실행시 이전 Task의 실행 결과를 가지고 새로운 Task를 실행 해야 하는 경우가 발생 한다.

이 때 .net TPL 에서 지원해 주는 기능이 Task.ContinueWith 이다.


[샘플 코드]

var t1 = Task.Factory.StartNew(() => GenerateTest(ct), ct);
var t2 = t1.ContinueWith((t) =>
	//Task(t1) 의 델리게이트 반환결과를 가지고 작업.
	for (int i = 0; i < t.Result.Count; i++)

[Save Zip Example]

Sting(Text)를 Zip파일로 저장.

public void SaveFileWithString()
  using (var zip = new ZipFile(Encoding.UTF8))
      zip.CompressionLevel = CompressionLevel.BestSpeed;
      //string을 파일로 저장하기 위한 추가.
      string text= "저장할 텍스트";
      zip.AddEntiry("압축파일내 경로", text);
      zip.Save("zip파일 저장 경로")


Directory를 Zip파일로 압축 저장.

public void SaveFileWithDirectory(string addDirectory, string saveZipPath)
    using (var zip = new ZipFile(Encoding.UTF8))
        zip.AddDirectory(addDirectory, "압축파일내 저장경로");

[Load Zip Example]

zip파일 내에 Text파일을 추출 하기.

public void LoadFileWithPath(string loadZipFile, string archiveInPath)
    if (!ZipFile.IsZipFile(loadZipFile)) return;

    using (var zip = ZipFile.Read(loadZipFile))
        var filesInZip = zip.SelectEntries("name = *.txt", archiveInPath); // archiveInPath 경로의 모든 파일 .txt 파일을 추출.

        foreach (var zipEntry in filesInZip)
            if(!zipEntry.IsText) continue; //텍스트 파일인 경우만.

            using (var ms = new MemoryStream())
                ms.Seek(0, SeekOrigin.Begin);
                using (var sr = new StreamReader(ms))
                    var resultString = sr.ReadToEnd();//text 파일을 가져온다.

public delegate object DynamicGetValue<T>(T component);
public delegate void DynamicSetValue<T>(T component, object newValue);

public class DynamicPropertyDescriptor<T> : PropertyDescriptor
    where T: class
    public enum AccessMode
    protected Type _componentType;
    protected Type _propertyType;
    protected DynamicGetValue<T> _getDelegate;
    protected DynamicSetValue<T> _setDelegate;

    public override string DisplayName
            return base.DisplayName;
    public DynamicPropertyDescriptor(string name, Type propertyType,
        DynamicGetValue<T> getDelegate, DynamicSetValue<T> setDelegate, Attribute[] attrs = null)
        base(name, attrs)
        _componentType = typeof(T);
        _propertyType = propertyType;
        _getDelegate = getDelegate;
        _setDelegate = setDelegate;
    public DynamicPropertyDescriptor(string name, Type propertyType, AccessMode accessMode = AccessMode.ReadWrite,  Attribute[] attrs = null)
        : base(name, attrs)
        _componentType = typeof(T);
        _propertyType = propertyType;

        _getDelegate += component => _componentType.GetProperty(name).GetValue(component, null);
            _setDelegate += (component, value) => _componentType.GetProperty(name).SetValue(component, value, null);

    public override bool CanResetValue(object component)
        return false;
        public override Type ComponentType
        get { return _componentType; }

    public override object GetValue(object component)
        return _getDelegate(component as T);

    public override bool IsReadOnly
        get { return _setDelegate == null; }

    public override Type PropertyType
        get { return _propertyType; }

    public override void ResetValue(object component)

    public override void SetValue(object component, object value)
        _setDelegate(component as T, value);

    public override bool ShouldSerializeValue(object component)
        return true;
//위에 클래스가 Dynamic하게 동적으로 PropetyGrid에 Binding하기위한 클래스 이다.
//실제 PropertyGrid에 어떤 이벤트에서 처리를 하냐면

PropertyGrid.CustomPropertyDescriptors += PropertyGrid_CustomPropertyDescriptors;

//이벤트 등록을 완료한 후

 private void _propertyGrid_CustomPropertyDescriptors(object sender, DevExpress.XtraVerticalGrid.Events.CustomPropertyDescriptorsEventArgs e)
     if (e.Source != null)
         e.Properties = new PropertyDescriptorCollection(_propertyDescriptors.ToArray());

//이제 거의 끝났다.. 그렇다면 _propertyDescriptors 는 어떻게 생성할까?
//내경우는 실제 바인딩 객체(=> PropertyGrid.Selectobject 에 Dictionary를 넣었기 때문에 아래와 같이 코딩이 되었다.)

List<DynamicPropertyDescriptor> _propertyDescriptors = new List<DynamicPropertyDescriptor>();

var dynamicPropertyDes = new DynamicPropertyDescriptor<MyType>(
    displayName //속성창에 이름
    , typeof(string) //값의 타입
    , dic => dic.ContainsKey(value.ToString()) ? dic[value.ToString()] : string.Empty // 실제 바인딩된 객체의 Getter
    , (dic, input) =>{dic[value.ToString()] = input.ToString().Trim();}                     //실제 바인딩된 객체의 Setter
    , new []{new CategoryAttribute("Category"), }                                           //Category셋팅


//이렇게 코딩을 하면 원하는 값과 형태로 PropertyGrid에 동적으로 할당할 수 있을 것이다.

Dictionary를 Xelement로 변환 한 후 , XML파일로 저장한다.

저장한 파일을 읽어서 Dictionary로 변환.



//Dictionary to Element:
Dictionary<string, string> dict = new Dictionary<string,string>();
XElement el = new XElement("root",dict.Select(kv => new XElement(kv.Key, kv.Value)));
//Element to Dictionary:
XElement rootElement = XElement.Load("파일 경로");
Dictionary<string, string> dict = new Dictionary<string, string>();
foreach(var el in rootElement.Elements())
   dict.Add(el.Name.LocalName, el.Value);

