コンテンツへスキップ

並行処理

並行処理 vs. 並列処理

「並行処理」と「並列処理」の定義は混同されることがありますが、両者は異なります。

並行システムとは、必ずしも同時に実行するわけではないが、多くのタスクを処理できるシステムのことです。キッチンで料理をしている自分を想像してみてください。玉ねぎを刻み、炒め始め、玉ねぎが炒められている間にトマトを刻みます。しかし、これらの作業をすべて同時に実行しているわけではありません。時間をこれらのタスクに割り当てているのです。並列処理とは、片手で玉ねぎを炒めながら、もう片方の手でトマトを刻むようなものです。

本稿執筆時点では、Crystalは並列処理ではなく並行処理をサポートしています。複数のタスクを実行できますが、それぞれのタスクに多少の時間が費やされます。しかし、2つのコードパスが全く同じ時間に実行されることはありません。

Crystalプログラムは、デフォルトではガベージコレクタ(現在Boehm GC)を除いて、単一のオペレーティングシステムスレッドで実行されます。並列処理はサポートされていますが、現在は実験段階と見なされています。詳細については、Crystalブログの並列処理に関する記事を参照してください。

ファイバー

並行処理を実現するために、Crystalはファイバーを使用します。ファイバーは、オペレーティングシステムのスレッドに似ていますが、はるかに軽量で、その実行はプロセス内部で管理されます。そのため、プログラムは複数のファイバーを生成し、Crystalは適切なタイミングでそれらを実行するようにします。

イベントループ

I/O関連のすべての処理にはイベントループがあります。時間のかかる操作はイベントループに委任され、イベントループがその操作の完了を待っている間に、プログラムは他のファイバーの実行を続けることができます。ソケットを介してデータが到着するのを待つことが、その簡単な例です。

チャネル

Crystalには、CSPに着想を得たチャネルがあります。これにより、メモリを共有することなく、ロック、セマフォ、その他の特別な構造を心配することなく、ファイバー間でデータをやり取りできます。

プログラムの実行

プログラムが開始されると、最上位レベルのコードを実行するメインファイバーが起動されます。そこで、多くの他のファイバーを生成できます。プログラムの構成要素は次のとおりです。

  • ランタイムスケジューラ:適切なタイミングですべてのファイバーを実行する役割を担います。
  • イベントループ:これは別のファイバーであり、ファイル、ソケット、パイプ、シグナル、タイマー(sleepの実行など)のような非同期タスクを担当します。
  • チャネル:ファイバー間でデータを通信します。ランタイムスケジューラは、ファイバーとチャネルの通信を調整します。
  • ガベージコレクタ:「不要になった」メモリをクリーンアップします。

ファイバー

ファイバーは、スレッドよりも軽量な実行単位です。8MBの関連付けられたスタックを持つ小さなオブジェクトであり、これは通常、オペレーティングシステムのスレッドに割り当てられるものです。

ファイバーは、スレッドとは異なり、協調型です。スレッドはプリエンプティブです。オペレーティングシステムはいつでもスレッドを中断し、別のスレッドの実行を開始する可能性があります。ファイバーは、ランタイムスケジューラに明示的に別のファイバーに切り替えるように指示する必要があります。たとえば、待機するI/Oがある場合、ファイバーはスケジューラに「見てください、このI/Oが利用可能になるまで待たなければなりません。そのI/Oの準備が整ったら戻ってきてください」と伝えます。

協調型であることの利点は、コンテキストスイッチ(スレッド間の切り替え)のオーバーヘッドの多くがなくなることです。

ファイバーはスレッドよりもはるかに軽量です。8MBが割り当てられていますが、4KBの小さなスタックから始まります。

64ビットマシンでは、数百万ものファイバーを生成できます。32ビットマシンでは、512個のファイバーしか生成できず、それほど多くありません。しかし、32ビットマシンは時代遅れになりつつあるため、将来に目を向け、64ビットマシンに重点を置いています。

ランタイムスケジューラ

スケジューラには、次のキューがあります。

  • 実行準備ができたファイバー:たとえば、ファイバーを生成すると、実行準備が整います。
  • イベントループ:これは別のファイバーです。実行準備ができた他のファイバーがない場合、イベントループは準備が整った非同期操作があるかどうかを確認し、その操作を待っているファイバーを実行します。イベントループは現在、libeventで実装されており、epollkqueueなどの他のイベントメカニズムの抽象化です。
  • 自発的に待機を要求したファイバー:これはFiber.yieldで行われ、「実行を続けることができますが、必要であれば他のファイバーを実行する時間を与えます」という意味です。

データの通信

現時点ではコードを実行しているスレッドが1つしかないため、異なるファイバーでクラス変数にアクセスして変更しても問題ありません。ただし、言語に複数のスレッド(並列処理)が導入されると、破損する可能性があります。そのため、データの通信に推奨されるメカニズムは、チャネルを使用してメッセージを送信することです。内部的に、チャネルはデータ競合を回避するためのすべてのロックメカニズムを実装していますが、外部からは通信プリミティブとして使用するため、(ユーザーは)ロックを使用する必要はありません。

サンプルコード

ファイバーの生成

ファイバーを生成するには、ブロックを使用してspawnを使用します。

spawn do
  # ...
  socket.gets
  # ...
end

spawn do
  # ...
  sleep 5.seconds
  #  ...
end

ここでは、2つのファイバーがあります。1つはソケットから読み込み、もう1つはsleepを実行します。最初のファイバーがsocket.gets行に到達すると、サスペンドされ、ソケットにデータがあるときにこのファイバーの実行を再開するようにイベントループに指示され、プログラムは2番目のファイバーで続行されます。このファイバーは5秒間スリープしたいので、イベントループは5秒後にこのファイバーを再開するように指示されます。実行する他のファイバーがない場合、イベントループはこれらのイベントのいずれかが発生するまでCPU時間を消費することなく待機します。

socket.getssleepがこのように動作する理由は、それらの実装がランタイムスケジューラとイベントループと直接通信するためです。魔法のようなものはありません。一般的に、標準ライブラリはこれらすべてを処理するので、ユーザーが自分で行う必要はありません。

ただし、ファイバーはすぐに実行されるわけではないことに注意してください。例えば

spawn do
  loop do
    puts "Hello!"
  end
end

上記のコードを実行しても、出力は生成されず、すぐに終了します。

その理由は、ファイバーは生成されたとたんに実行されるわけではないためです。そのため、上記のファイバーを生成するメインファイバーは実行を終了し、プログラムは終了します。

解決策の1つは、sleepを使用することです。

spawn do
  loop do
    puts "Hello!"
  end
end

sleep 1.second

このプログラムは、1秒間「Hello!」を出力してから終了します。これは、sleep呼び出しによってメインファイバーが1秒後に実行されるようにスケジュールされ、その後、「実行準備完了」状態の別のファイバー(この場合は上記のファイバー)が実行されるためです。

別の方法としては、以下があります。

spawn do
  loop do
    puts "Hello!"
  end
end

Fiber.yield

今回は、Fiber.yieldによってスケジューラに他のファイバーを実行するように指示されます。「Hello!」が標準出力でブロックされるまで(システムコールによって、出力が準備できるまで待機する必要があることが通知されます)、出力され、その後、メインファイバーで実行が続行され、プログラムは終了します。ここでは、標準出力がブロックされない可能性があるため、プログラムは永遠に実行を続ける可能性があります。

生成されたファイバーを永遠に実行したい場合は、引数なしでsleepを使用できます。

spawn do
  loop do
    puts "Hello!"
  end
end

sleep

もちろん、上記のプログラムはspawnを使用せずに、ループだけで記述することもできます。sleepは、複数のファイバーを生成する場合により便利です。

呼び出しの生成

ブロックの代わりにメソッド呼び出しを渡して生成することもできます。これがなぜ便利なのかを理解するために、次の例を見てみましょう。

i = 0
while i < 10
  spawn do
    puts(i)
  end
  i += 1
end

Fiber.yield

上記のプログラムは「10」を10回出力します。問題は、生成されたすべてのファイバーが参照するiという変数が1つしかないことであり、Fiber.yieldが実行されるとその値は10になることです。

これを解決するには、次のようにします。

i = 0
while i < 10
  proc = ->(x : Int32) do
    spawn do
      puts(x)
    end
  end
  proc.call(i)
  i += 1
end

Fiber.yield

これで機能します。これは、Procを作成し、iを渡して呼び出しているため、値がコピーされ、生成されたファイバーがコピーを受け取るからです。

この定型コードを回避するために、標準ライブラリは呼び出し式を受け入れるspawnマクロを提供しており、基本的に上記の処理を行うように書き換えます。これを使用すると、次のようになります。

i = 0
while i < 10
  spawn puts(i)
  i += 1
end

Fiber.yield

これは主に、反復処理で変化するローカル変数で役立ちます。ブロック引数ではこれは発生しません。例えば、これは期待通りに動作します。

10.times do |i|
  spawn do
    puts i
  end
end

Fiber.yield

ファイバーの生成と完了待ち

これにはチャネルを使用できます。

channel = Channel(Nil).new

spawn do
  puts "Before send"
  channel.send(nil)
  puts "After send"
end

puts "Before receive"
channel.receive
puts "After receive"

これは以下を出力します。

Before receive
Before send
After send
After receive

まず、プログラムはファイバーを生成しますが、まだ実行しません。channel.receiveを呼び出すと、メインファイバーがブロックされ、生成されたファイバーで実行が続行されます。次に、channel.send(nil)が呼び出されます。このsendは、receiveが最初のsendの前に呼び出されているため、チャネルに空間を占有しません。sendはブロックされません。ファイバーは、ブロックされているか、完了まで実行されている場合にのみ切り替わります。したがって、生成されたファイバーはsendの後で続行され、puts "After send"が実行されると、実行はメインファイバーに戻ります。

次に、メインファイバーは値を待っていたchannel.receiveで再開します。その後、メインファイバーは実行を続け、終了します。

上記の例では、ファイバーが終了したことを伝えるためにnilを使用しました。チャネルを使用して、ファイバー間で値をやり取りすることもできます。

channel = Channel(Int32).new

spawn do
  puts "Before first send"
  channel.send(1)
  puts "Before second send"
  channel.send(2)
end

puts "Before first receive"
value = channel.receive
puts value # => 1

puts "Before second receive"
value = channel.receive
puts value # => 2

出力

Before first receive
Before first send
Before second send
1
Before second receive
2

プログラムがreceiveを実行すると、現在のファイバーがブロックされ、他のファイバーで実行が続行されることに注意してください。channel.send(1)が実行されると、チャネルがいっぱいではない場合、sendはノンブロッキングであるため、実行が続行されます。しかし、channel.send(2)は、チャネル(デフォルトではサイズが1)がいっぱいであるため、ファイバーをブロックし、そのチャネルを待っていたファイバーで実行が続行されます。

ここではリテラル値を送信していますが、生成されたファイバーは、たとえばファイルを読み取ったり、ソケットから取得したりすることで、この値を計算する可能性があります。このファイバーがI/Oを待機する必要がある場合、他のファイバーはI/Oの準備ができるまでコードの実行を続け、最終的に値の準備ができてチャネルを介して送信されると、メインファイバーがそれを受信します。例えば

require "socket"

channel = Channel(String).new

spawn do
  server = TCPServer.new("0.0.0.0", 8080)
  socket = server.accept
  while line = socket.gets
    channel.send(line)
  end
end

spawn do
  while line = gets
    channel.send(line)
  end
end

3.times do
  puts channel.receive
end

上記のプログラムは2つのファイバーを生成します。最初のファイバーはTCPServerを作成し、1つの接続を受け入れ、そこから行を読み取り、チャネルに送信します。2番目のファイバーは標準入力から行を読み取ります。メインファイバーは、ソケットまたは標準入力のいずれかからチャネルに送信された最初の3つのメッセージを読み取り、プログラムは終了します。gets呼び出しは、ファイバーをブロックし、データが到着した場合にそこでイベントループを続行するように指示します。

同様に、複数のファイバーの実行が完了するのを待って、それらの値を収集することもできます。

channel = Channel(Int32).new

10.times do |i|
  spawn do
    channel.send(i * 2)
  end
end

sum = 0
10.times do
  sum += channel.receive
end
puts sum # => 90

もちろん、生成されたファイバー内でreceiveを使用することもできます。

channel = Channel(Int32).new

spawn do
  puts "Before send"
  channel.send(1)
  puts "After send"
end

spawn do
  puts "Before receive"
  puts channel.receive
  puts "After receive"
end

puts "Before yield"
Fiber.yield
puts "After yield"

出力

Before yield
Before send
Before receive
1
After receive
After send
After yield

ここでは、最初にchannel.sendが実行されますが、値を待っているものがないため(まだ)、他のファイバーで実行が続行されます。2番目のファイバーが実行され、チャネルに値があるので、取得され、実行が続行されます。最初に最初のファイバー、次にメインファイバーで実行が続行されます。これは、Fiber.yieldがファイバーを実行キューの最後に配置するためです。

バッファ付きチャネル

上記の例では、バッファされていないチャネルを使用しています。値を送信する場合、ファイバーがそのチャネルを待っている場合、そのファイバーで実行が続行されます。

バッファ付きチャネルでは、sendを呼び出しても、バッファがいっぱいになるまで他のファイバーに切り替わりません。

# A buffered channel of capacity 2
channel = Channel(Int32).new(2)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "After send"
end

3.times do |i|
  puts channel.receive
end

出力

Before send 1
Before send 2
Before send 3
After send
1
2
3

最初のsendはチャネルに空間を占有しません。これは、最初のsendの前にreceiveが呼び出されているためです。一方、他の2つのsend呼び出しは、それぞれのreceiveの前に発生します。send呼び出しの数はバッファの境界を超えないため、sendファイバーは中断されることなく完了まで実行されます。

バッファのすべての空間が占有される例を次に示します。

# A buffered channel of capacity 1
channel = Channel(Int32).new(1)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "End of send fiber"
end

3.times do |i|
  puts channel.receive
end

出力

Before send 1
Before send 2
Before send 3
1
2
3

「End of send fiber」が出力に表示されないのは、3つのsend呼び出しをreceiveしているため、3.timesが完了まで実行され、結果としてメインファイバーのブロックが解除され、完了まで実行されるためです。

これは、まさに今見たスニペットと同じですが、一番下にFiber.yield呼び出しが追加されています。

# A buffered channel of capacity 1
channel = Channel(Int32).new(1)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "End of send fiber"
end

3.times do |i|
  puts channel.receive
end

Fiber.yield

出力

Before send 1
Before send 2
Before send 3
1
2
3
End of send fiber

スニペットの最後にFiber.yield呼び出しを追加することで、メインファイバーが完了まで実行されるために見逃されていた「End of send fiber」メッセージが出力に表示されます。