Java - Concurrency - Đa luồng

Giống như nhiều ngôn ngữ khác, java cũng hỗ trợ đa luồng, đa nhiệm, 2 đơn vị căn bản là process và thread.
Nếu như 1 process sở hữu cho mình 1 môi trường thực thi riêng, bao gồm bộ nhớ, tài nguyên, thì thread, lại chia sẻ tài nguyên, bộ nhớ của process chứa nó.
Nói chung process và thread là những khái niệm khá cơ bản trong vấn đề đa nhiệm, vì vậy bạn nên tìm hiểu về nền tảng và sau đó hãy xem xét đến môi trường của java.
Ngay cả CPU 1 core thì cũng đảm nhận đa nhiệm, nhiều process, thread cùng lúc được, vì CPU biết các chia thời gian xử lý thành những phần nhỏ gọi là slicing và xen lẫn các thread vào các slicing đó.
Một chương trình có thể có nhiều process liên kết với nhau qua IPC (pipes, sockets,...), 1 process có ít nhất 1 thread, khi chúng ta tạo 1 ứng dụng đơn giản nhất thì mặc định chúng ta có main thread, main thread có thể tạo các thread mới...
Chúng ta sẽ chủ yếu nói về Thread.

1. Thread Object:
a. Định nghĩa và khởi động 1 thread:

Có 2 cách sau định nghĩa và khởi động 1 process:

a1. Implements Runnable interface: Chúng ta phải Override run() method

public class HelloRunnable implements Runnable {
    public void run() {
        System.out.println("Hello from a thread!");
    }

    public static void main(String args[]) { 
        HelloRunnable run = new HelloRunnable();
        Thread t = new Thread(run, "HELLO THREAD !"); 
        t.start();
    }
}

a2. Extends Thread class: Chúng ta cũng phải Override run() method

public class HelloThread extends Thread {
    public void run() {
        System.out.println("Hello from a thread!");
    }

    public static void main(String args[]) {
        HelloThread t = new HelloThread();
        t.start();
    }
}

Chúng ta sẽ dùng cách thứ 1 vì Runnable Object có thể subclass của nhiều class khác ngoài Thread.

b. Sleep 1 thread:

Làm cho thread suspend 1 khoảng thời gian được chỉ định. Để sử dụng được chúng ta phải làm thủ tục throw/catch InterruptedException. Hãy xem 1 ví dụ:

public class SleepMessages {
    public static void main(String args[])
        throws InterruptedException {
        String importantInfo[] = {
            "Mares eat oats",
            "Does eat oats",
            "Little lambs eat ivy",
            "A kid will eat ivy too"
        };

        for (int i = 0; i < importantInfo.length; i++) {
            //Pause for 4 seconds
            Thread.sleep(4000);
            //Print a message
            System.out.println(importantInfo[i]);
        }
    }
}

c. Interrup

Dừng thực thi công việc hiện tại để làm việc khác, tuy nhiên, đa phần là terminal thread.
Để thread bị interrupt hoạt động đúng thì thread cần "Hỗ trợ Interruption":

- Đối với thread thực thi thường xuyên các method gây InterruptedException (như sleep, join,...) thì cần return từ run() method ngay khi catch Exception này.

public void run() {
  for (int i = 0; i < importantInfo.length; i++) {
    // Pause for 4 seconds
    try {
        Thread.sleep(4000);
    } catch (InterruptedException e) {
        // We've been interrupted: no more messages.
        return;
    }
    // Print a message
    System.out.println(importantInfo[i]);
  }
} 

- Đối với những thread lâu không thực thi các method gây InterruptedException, cần định kỳ kiểm tra và return như thế này:

if (Thread.interrupted()) {
        // We've been interrupted: no more crunching.
        return;
    }

hay

if (Thread.interrupted()) {
    throw new InterruptedException();
}

d. Join

t.join() hay t.join(miliseconds) có nghĩa là thread hiện tại (thread gọi method này) sẽ tạm ngưng và chỉ trở lại khi thread t thực thi xong (hoặc 1 khoảng thời gian nếu chỉ định số miliseconds)

2. Synchronization
Các thread có thể chia sẻ với nhau quyền truy cập đến các field cũng như là các object được reference.tới. Việc này mang lại hiệu quả nhưng nhiều khi gây nên nhiều lỗi như thread interference hay memory consistency error.
Chúng ta có thể dùng Synchronization để giải quyết các vấn đề trên (thực ra việc này cũng có thể gây ra lỗi thread contention - gây chậm chương trình, các thread liên quan)

a. Thread Interference:
Đây là lỗi khi nhiều thread cùng truy cập và xử lý với cùng 1 đối tượng trong cùng khoảng thời gian, dẫn tói các thread gây can thiệp lẫn nhau không mong muốn. Ví dụ:

class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }

}

Bạn có tưởng tượng được gì nếu 2 thread A và B cùng gọi và xử lý 1 Counter object, thread A gọi increment() method, thread B gọi decrement() method, sau đó cả 2 gọi value() method để in kết quả.
Nếu xuất phát ban đầu là c=0, thì:
  1. Thread A: Nhận giá trị từ c.
  2. Thread B: Nhận giá trị từ c.
  3. Thread A: Increment giá trị mới nhận; kết qủa là 1.
  4. Thread B: Decrement giá trị mới nhận; kết quả là -1.
  5. Thread A: Lưu giá trị vào c; c = 1.
  6. Thread B: Lưu giá trị vào c; c = -1.
Cuối cùng khi A gọi value để in kết quả thì c = -1 (giá trị sau cùng)

b. Memory consistency error:
Giả sử ta có:
c = 0;
#1: c++;        thread A
#2: System.out.println(c);       thread B
Vấn đề là, không phải lúc nào thread A cũng thực hiện lệnh của mình trước thread B, và khi đó, mọi ý đồ của coder sẽ bị phá vỡ.
Lỗi này được nói chung là: các thread khác nhau có cách nhìn không tương xứng mà lẽ ra phải có với cùng 1 dữ liệu.
Có thể tránh lỗi này với thủ tục gọi là happens-before relationship.

c. Happens-before relationship:
Trước hết chúng ta cần hiểu rằng, thứ tự của các dòng lệnh trong mã nguồn chương trình hoàn toàn có thể bị đảo lộn, sắp xếp lại bởi compiler, JVM hay processer, điều này để dảm bảo tính hiệu quả và yêu cầu từ các nhà phát triển khác nhau.
Như vậy, nguy cơ về chuyện 1 lệnh ở 1 thread thực thi nhưng với những dữ liệu chưa được cập nhật đúng cách trên bộ nhớ - do nó không "nhìn thấy" kết quả từ lênh trước theo như ý đồ của coder (phần trên đã nói). Để khắc phục điều này, thủ tục happens-before relationship được đề xướng.
Nếu A và B được gọi trong cùng thread, chúng ta luôn được đảm bảo chúng sẽ được gọi theo thứ tự nhu trong mã nguồn.
Nếu khác thread,  A happens-before relationship với B, đảm bảo rằng A sẽ được gọi trước B hay nói cách khác B nhìn thấy kết quả từ A.
Nhưng chúng ta phải chú ý 1 điều, A và B ở đây không phải 1 chỉ lệnh, mà là 1 action, bao gồm các thao tác: Đọc/ viết các biến, lock/ unlock monitor, starting/ joining thread.
Sở dĩ như vậy vì 1 lệnh đôi khi không cần thiết phải giữ đúng thứ tự, nếu điều đó không ảnh hưởng đến kết quả, ví dụ:
x = 3;  // (1)
y = 4;  // (2)
...
Ở đây lệnh (1) và (2) có thể đổi thứ tự được.
Synchronization là 1 trong nhiều cách thực hiện happens-before relationship

d. Synchronization:
Như các vấn đề đã nói trên, chúng ta cần đảm bảo rằng tại 1 thời điểm chỉ 1 thread được truy cập 1 đối tượng, thread này thực hiện xong đến lượt thread khác.
Có 2 cách chính: Syn. method và Syn. statement:
d1. Syn. method:

public class SynchronizedCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public synchronized int value() {
        return c;
    }
}

Như ta thấy, chỉ cần thêm keyword: synchronized trước method là được. Với cách này ta có 2 hệ quả sau:
- Đảm bảo rằng, trong 1 thời điểm, 1 synchronized method của 1 đối tượng chỉ được thực hiện bởi 1 thread (các thread khác sẽ bị block nếu cùng nhắm tới method này, cho đến khi thread kia hoàn thành)
- Sau khi 1 synchronized method exit, nó tự động tạo happens-before relationship với bất cứ method được gọi theo sau nào của cùng đối tượng.
Chú ý constructor không synchronized được và cũng không cần (vì với 1 đối tượng thì nó chỉ được gọi 1 lần bởi 1 thread)

d2. Synchronized statement (block):
Trước hết chúng ta nói qua về lock hay monitor. Mỗi object có cho mình 1 lock, 1 thread muốn làm việc với object này phải "own" được lock của nó. Trước khi thực thi với object, thread phải acquire lock, sau khi thực hiện xong, thread release lock này. Trong khi 1 thread đang own 1 object clock, các thread khác phải đợi cho đến khi thread đó nhả ra (release).
Synchronized statement khác Synchronized method ở chỗ Synchronized statement chỉ định cụ thể object.

public class Counter {
  private int count = 0;
  public void increment() {
    synchronized (this) {
      count++;
    }
  }
  public int getCount() {
    synchronized (this) {
      return count;
    }
  }
}

Cơ chế của việc này như sau: khi nhận thấy có từ khóa Synchronized, compiler làm cho các khối lệnh làm thủ tục own lock của object được chỉ định, như vậy ta đảm bảo được tính đồng bộ hóa cho các thread với đối tượng này. Ví dụ:

class Callme {
   void call(String msg) {
      synchronized(this) {
        System.out.print("[" + msg);
        try {
           Thread.sleep(1000);
        } catch (InterruptedException e) {
           System.out.println("Interrupted");
        }
        System.out.println("]");
      }
   }
}

class Caller implements Runnable {
   String msg;
   Callme target;
   Thread t;
   public Caller(Callme targ, String s) {
      target = targ;
      msg = s;
      t = new Thread(this);
      t.start();
   }
 
   // synchronize calls to call()
   public void run() {
      //synchronized(target) { // synchronized block
         target.call(msg);
      //}
   }
}
public class Synch {
   public static void main(String args[]) {
      Callme myTarget = new Callme();
      Caller ob1 = new Caller(myTarget, "Hello");
      Caller ob2 = new Caller(myTarget, "Synchronized");
      Caller ob3 = new Caller(myTarget, "World");
 
      // wait for threads to end
      try {
         ob1.t.join();
         ob2.t.join();
         ob3.t.join();
      } catch(InterruptedException e) {
         System.out.println("Interrupted");
      }
   }
}

3. Một số vấn đề với multithread:
a. Deadlock:
Sửu dụng từ khóa synchronized để đồng bộ hóa cũng gây ra những vấn đề ngoài ý muốn. Deadlock là vấn đề khi 2 hay nhiều thread đợi lẫn nhau, dẫn đến vô thời hạn, thread đợi lẫn nhau vì thread A đang giữ lock của đối tượng a, và có 1 method nào đấy muốn truy cập đến đối tượng b được lock bởi thread B và thế là thread A phải đợi thread B làm xong phần việc và nhả b ra, rủi thay thread B cũng cần truy cập qua a (ngược lại), và thế là 2 thread đợi nhau trong... vô vọng.
Ví dụ 1:

public class MyDeadlock {
    private static MyDeadlock a = new MyDeadlock();
    private static MyDeadlock b = new MyDeadlock();
    public synchronized void testFunc(MyDeadlock friend) {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + ": Inside testFunc !");
        System.out.println(threadName + ": Prepare to get friend cross...");
        try {       // cause to asynchronization if not use synchronized keyword
            Thread.sleep(2000);
        } catch(InterruptedException e) {
            System.out.println(threadName + " is interrupted !");
        }
        friend.cross();
    }
    public synchronized void cross() {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + ": Inside croos !");
    }
    static class MyRunnable implements Runnable {
        public void run() {
            a.testFunc(b);      // by additional thread (t)
        }
    }
    public static void main(String... args) {
        Thread t = new Thread(new MyRunnable(), "DEMO THREAD");
        t.start();
        b.testFunc(a);      // by Main thread
    }
}

Hoặc ví dụ 2:

  public static Object cacheLock = new Object();
  public static Object tableLock = new Object();
  ...
  public void oneMethod() {
    synchronized (cacheLock) {
      synchronized (tableLock) { 
        doSomething();
      }
    }
  }
  public void anotherMethod() {
    synchronized (tableLock) {
      synchronized (cacheLock) { 
        doSomethingElse();
      }
    }
  }

Ở ví dụ 2, nếu thread A thực thi oneMethod(), threadB thực thi anotherMethod(). Chúng ta hãy tưởng tượng khi thread A gọi oneMethod(), nó sẽ lock cacheLock đầu tiên, thread B gọi anotherMethod(), nó sẽ lock tableLock. Sau đó, thread A đòi lock tiếp tableLock, tương tự thread B cũng đòi lock tiếp cacheLock (theo thứ tự trong mỗi method). Và deadlock xảy ra.

Ví dụ 3:

// File Name ThreadSafeBankAccount.java
public class ThreadSafeBankAccount
{
   private double balance;
   private int number;
   public ThreadSafeBankAccount(int num, double initialBalance)
   {
      balance = initialBalance;
      number = num;
   }
   public int getNumber()
   {
      return number;
   }
   public double getBalance()
   {
      return balance;
   }
   public void deposit(double amount)
   {
      synchronized(this)
      {
        double prevBalance = balance;
        try
        {
           Thread.sleep(4000);
        }catch(InterruptedException e)
        {}
        balance = prevBalance + amount;
      }
   }
   public void withdraw(double amount)
   {
      synchronized(this)
      {
      double prevBalance = balance;
         try
         {
            Thread.sleep(4000);
         }catch(InterruptedException e)
         {}
         balance = prevBalance - amount;
      }
   }
}

// File Name LazyTeller.java
public class LazyTeller extends Thread
{
   private ThreadSafeBankAccount source, dest;
   public LazyTeller(ThreadSafeBankAccount a, 
                     ThreadSafeBankAccount b)
   {
      source = a;
      dest = b;
   }
   public void run()
   {
      transfer(250.00);
   }
   public void transfer(double amount)
   {
      System.out.println("Transferring from "
          + source.getNumber() + " to " + dest.getNumber());
      synchronized(source)
      {
          Thread.yield();
          synchronized(dest)
          {
             System.out.println("Withdrawing from "
                     + source.getNumber());
             source.withdraw(amount);
             System.out.println("Depositing into "
                     + dest.getNumber());
             dest.deposit(amount);
          }
       }
   }
}
public class DeadlockDemo
{
   public static void main(String[] args)
   {
      System.out.println("Creating two bank accounts...");
      ThreadSafeBankAccount checking =
                    new ThreadSafeBankAccount(101, 1000.00);
      ThreadSafeBankAccount savings =
                    new ThreadSafeBankAccount(102, 5000.00);

      System.out.println("Creating two teller threads...");
      Thread teller1 = new LazyTeller(checking, savings);
      Thread teller2 = new LazyTeller(savings, checking);
      System.out.println("Starting both threads...");
      teller1.start();
      teller2.start();
   }
}
Khi teller1 gọi transfer(), nó lock source - chính là checking, cùng lúc teller2 gọi transfer() và lock dest - tức saving. Ngay sau khi lock checking, teller1 đòi lock cả saving, ngược lại teller2 lại đòi lock checking (nghiệp vụ transfer đòi hỏi có source và dest để chuyển tiền !). Và thế là deadlock.

Khắc phục/ tránh deadlock:
Trong vd 2 và 3, để ý nếu ta biết cách xếp đặt thứ tự lock các đối tượng của các thread giống nhau thì sẽ giải quyết được.

  public static Object cacheLock = new Object();
  public static Object tableLock = new Object();
  ...
  public void oneMethod() {
    synchronized (cacheLock) {
      synchronized (tableLock) { 
        doSomething();
      }
    }
  }
  public void anotherMethod() { 
    synchronized (cacheLock) { 
      synchronized (tableLock) {
        doSomethingElse();
      }
    }
  }

ví dụ 3: sửa method transfer():

public void transfer(double amount)
   {
       System.out.println("Transferring from " + source.getNumber()
           + " to " + dest.getNumber());
       ThreadSafeBankAccount first, second;
       if(source.getNumber() < dest.getNumber())
       {
          first = source;
          second = dest;
       }
       else
       {
          first = dest; 
          second = source;
       }
       synchronized(first)
       {
          Thread.yield();
          synchronized(second)
          {
             System.out.println("Withdrawing from "
                         + source.getNumber());
             source.withdraw(amount);
             System.out.println("Depositing into "
                         + dest.getNumber());
             dest.deposit(amount);
          }
      }
   }
(dùng first và second làm trung gian để đảm bảo 2 thread luôn lock object có getName() trả về nhỏ hơn - first, trước).
Một phương pháp nữa là giảm scope giữa các phương thức đồng bộ (hạn chế việc 2 các method này ở các lớp khác nhau,...) xuống tối đa có thể.

b. Starvation:
Một thread không thể truy cập tài nguyên, biến... do bị chiếm bởi 1 thread tham lam khác
c.Livelock:
Một vấn đề nữa là tình trạng 2 hay nhiều thread cùng truy cập đến 1 tài nguyên, biến,... dùng chung, khác với deadlock - các thread bị block, không có sự hoạt động, trong livelock, các thread luôn thay đổi trạng thái mặc dù không có sự tiến triển nào của chương trình.


Nhận xét