並行処理¶
並行処理 vs. 並列処理¶
「並行処理」と「並列処理」の定義は混同されることがありますが、両者は異なります。
並行システムとは、必ずしも同時に実行するわけではないが、多くのタスクを処理できるシステムのことです。キッチンで料理をしている自分を想像してみてください。玉ねぎを刻み、炒め始め、玉ねぎが炒められている間にトマトを刻みます。しかし、これらの作業をすべて同時に実行しているわけではありません。時間をこれらのタスクに割り当てているのです。並列処理とは、片手で玉ねぎを炒めながら、もう片方の手でトマトを刻むようなものです。
本稿執筆時点では、Crystalは並列処理ではなく並行処理をサポートしています。複数のタスクを実行できますが、それぞれのタスクに多少の時間が費やされます。しかし、2つのコードパスが全く同じ時間に実行されることはありません。
Crystalプログラムは、デフォルトではガベージコレクタ(現在Boehm GC)を除いて、単一のオペレーティングシステムスレッドで実行されます。並列処理はサポートされていますが、現在は実験段階と見なされています。詳細については、Crystalブログの並列処理に関する記事を参照してください。
ファイバー¶
並行処理を実現するために、Crystalはファイバーを使用します。ファイバーは、オペレーティングシステムのスレッドに似ていますが、はるかに軽量で、その実行はプロセス内部で管理されます。そのため、プログラムは複数のファイバーを生成し、Crystalは適切なタイミングでそれらを実行するようにします。
イベントループ¶
I/O関連のすべての処理にはイベントループがあります。時間のかかる操作はイベントループに委任され、イベントループがその操作の完了を待っている間に、プログラムは他のファイバーの実行を続けることができます。ソケットを介してデータが到着するのを待つことが、その簡単な例です。
チャネル¶
Crystalには、CSPに着想を得たチャネルがあります。これにより、メモリを共有することなく、ロック、セマフォ、その他の特別な構造を心配することなく、ファイバー間でデータをやり取りできます。
プログラムの実行¶
プログラムが開始されると、最上位レベルのコードを実行するメインファイバーが起動されます。そこで、多くの他のファイバーを生成できます。プログラムの構成要素は次のとおりです。
- ランタイムスケジューラ:適切なタイミングですべてのファイバーを実行する役割を担います。
- イベントループ:これは別のファイバーであり、ファイル、ソケット、パイプ、シグナル、タイマー(
sleep
の実行など)のような非同期タスクを担当します。 - チャネル:ファイバー間でデータを通信します。ランタイムスケジューラは、ファイバーとチャネルの通信を調整します。
- ガベージコレクタ:「不要になった」メモリをクリーンアップします。
ファイバー¶
ファイバーは、スレッドよりも軽量な実行単位です。8MBの関連付けられたスタックを持つ小さなオブジェクトであり、これは通常、オペレーティングシステムのスレッドに割り当てられるものです。
ファイバーは、スレッドとは異なり、協調型です。スレッドはプリエンプティブです。オペレーティングシステムはいつでもスレッドを中断し、別のスレッドの実行を開始する可能性があります。ファイバーは、ランタイムスケジューラに明示的に別のファイバーに切り替えるように指示する必要があります。たとえば、待機するI/Oがある場合、ファイバーはスケジューラに「見てください、このI/Oが利用可能になるまで待たなければなりません。そのI/Oの準備が整ったら戻ってきてください」と伝えます。
協調型であることの利点は、コンテキストスイッチ(スレッド間の切り替え)のオーバーヘッドの多くがなくなることです。
ファイバーはスレッドよりもはるかに軽量です。8MBが割り当てられていますが、4KBの小さなスタックから始まります。
64ビットマシンでは、数百万ものファイバーを生成できます。32ビットマシンでは、512個のファイバーしか生成できず、それほど多くありません。しかし、32ビットマシンは時代遅れになりつつあるため、将来に目を向け、64ビットマシンに重点を置いています。
ランタイムスケジューラ¶
スケジューラには、次のキューがあります。
- 実行準備ができたファイバー:たとえば、ファイバーを生成すると、実行準備が整います。
- イベントループ:これは別のファイバーです。実行準備ができた他のファイバーがない場合、イベントループは準備が整った非同期操作があるかどうかを確認し、その操作を待っているファイバーを実行します。イベントループは現在、
libevent
で実装されており、epoll
やkqueue
などの他のイベントメカニズムの抽象化です。 - 自発的に待機を要求したファイバー:これは
Fiber.yield
で行われ、「実行を続けることができますが、必要であれば他のファイバーを実行する時間を与えます」という意味です。
データの通信¶
現時点ではコードを実行しているスレッドが1つしかないため、異なるファイバーでクラス変数にアクセスして変更しても問題ありません。ただし、言語に複数のスレッド(並列処理)が導入されると、破損する可能性があります。そのため、データの通信に推奨されるメカニズムは、チャネルを使用してメッセージを送信することです。内部的に、チャネルはデータ競合を回避するためのすべてのロックメカニズムを実装していますが、外部からは通信プリミティブとして使用するため、(ユーザーは)ロックを使用する必要はありません。
サンプルコード¶
ファイバーの生成¶
ファイバーを生成するには、ブロックを使用してspawn
を使用します。
spawn do
# ...
socket.gets
# ...
end
spawn do
# ...
sleep 5.seconds
# ...
end
ここでは、2つのファイバーがあります。1つはソケットから読み込み、もう1つはsleep
を実行します。最初のファイバーがsocket.gets
行に到達すると、サスペンドされ、ソケットにデータがあるときにこのファイバーの実行を再開するようにイベントループに指示され、プログラムは2番目のファイバーで続行されます。このファイバーは5秒間スリープしたいので、イベントループは5秒後にこのファイバーを再開するように指示されます。実行する他のファイバーがない場合、イベントループはこれらのイベントのいずれかが発生するまでCPU時間を消費することなく待機します。
socket.gets
とsleep
がこのように動作する理由は、それらの実装がランタイムスケジューラとイベントループと直接通信するためです。魔法のようなものはありません。一般的に、標準ライブラリはこれらすべてを処理するので、ユーザーが自分で行う必要はありません。
ただし、ファイバーはすぐに実行されるわけではないことに注意してください。例えば
spawn do
loop do
puts "Hello!"
end
end
上記のコードを実行しても、出力は生成されず、すぐに終了します。
その理由は、ファイバーは生成されたとたんに実行されるわけではないためです。そのため、上記のファイバーを生成するメインファイバーは実行を終了し、プログラムは終了します。
解決策の1つは、sleep
を使用することです。
spawn do
loop do
puts "Hello!"
end
end
sleep 1.second
このプログラムは、1秒間「Hello!」を出力してから終了します。これは、sleep
呼び出しによってメインファイバーが1秒後に実行されるようにスケジュールされ、その後、「実行準備完了」状態の別のファイバー(この場合は上記のファイバー)が実行されるためです。
別の方法としては、以下があります。
spawn do
loop do
puts "Hello!"
end
end
Fiber.yield
今回は、Fiber.yield
によってスケジューラに他のファイバーを実行するように指示されます。「Hello!」が標準出力でブロックされるまで(システムコールによって、出力が準備できるまで待機する必要があることが通知されます)、出力され、その後、メインファイバーで実行が続行され、プログラムは終了します。ここでは、標準出力がブロックされない可能性があるため、プログラムは永遠に実行を続ける可能性があります。
生成されたファイバーを永遠に実行したい場合は、引数なしでsleep
を使用できます。
spawn do
loop do
puts "Hello!"
end
end
sleep
もちろん、上記のプログラムはspawn
を使用せずに、ループだけで記述することもできます。sleep
は、複数のファイバーを生成する場合により便利です。
呼び出しの生成¶
ブロックの代わりにメソッド呼び出しを渡して生成することもできます。これがなぜ便利なのかを理解するために、次の例を見てみましょう。
i = 0
while i < 10
spawn do
puts(i)
end
i += 1
end
Fiber.yield
上記のプログラムは「10」を10回出力します。問題は、生成されたすべてのファイバーが参照するi
という変数が1つしかないことであり、Fiber.yield
が実行されるとその値は10になることです。
これを解決するには、次のようにします。
i = 0
while i < 10
proc = ->(x : Int32) do
spawn do
puts(x)
end
end
proc.call(i)
i += 1
end
Fiber.yield
これで機能します。これは、Procを作成し、i
を渡して呼び出しているため、値がコピーされ、生成されたファイバーがコピーを受け取るからです。
この定型コードを回避するために、標準ライブラリは呼び出し式を受け入れるspawn
マクロを提供しており、基本的に上記の処理を行うように書き換えます。これを使用すると、次のようになります。
i = 0
while i < 10
spawn puts(i)
i += 1
end
Fiber.yield
これは主に、反復処理で変化するローカル変数で役立ちます。ブロック引数ではこれは発生しません。例えば、これは期待通りに動作します。
10.times do |i|
spawn do
puts i
end
end
Fiber.yield
ファイバーの生成と完了待ち¶
これにはチャネルを使用できます。
channel = Channel(Nil).new
spawn do
puts "Before send"
channel.send(nil)
puts "After send"
end
puts "Before receive"
channel.receive
puts "After receive"
これは以下を出力します。
Before receive
Before send
After send
After receive
まず、プログラムはファイバーを生成しますが、まだ実行しません。channel.receive
を呼び出すと、メインファイバーがブロックされ、生成されたファイバーで実行が続行されます。次に、channel.send(nil)
が呼び出されます。このsend
は、receive
が最初のsend
の前に呼び出されているため、チャネルに空間を占有しません。send
はブロックされません。ファイバーは、ブロックされているか、完了まで実行されている場合にのみ切り替わります。したがって、生成されたファイバーはsend
の後で続行され、puts "After send"
が実行されると、実行はメインファイバーに戻ります。
次に、メインファイバーは値を待っていたchannel.receive
で再開します。その後、メインファイバーは実行を続け、終了します。
上記の例では、ファイバーが終了したことを伝えるためにnil
を使用しました。チャネルを使用して、ファイバー間で値をやり取りすることもできます。
channel = Channel(Int32).new
spawn do
puts "Before first send"
channel.send(1)
puts "Before second send"
channel.send(2)
end
puts "Before first receive"
value = channel.receive
puts value # => 1
puts "Before second receive"
value = channel.receive
puts value # => 2
出力
Before first receive
Before first send
Before second send
1
Before second receive
2
プログラムがreceive
を実行すると、現在のファイバーがブロックされ、他のファイバーで実行が続行されることに注意してください。channel.send(1)
が実行されると、チャネルがいっぱいではない場合、send
はノンブロッキングであるため、実行が続行されます。しかし、channel.send(2)
は、チャネル(デフォルトではサイズが1)がいっぱいであるため、ファイバーをブロックし、そのチャネルを待っていたファイバーで実行が続行されます。
ここではリテラル値を送信していますが、生成されたファイバーは、たとえばファイルを読み取ったり、ソケットから取得したりすることで、この値を計算する可能性があります。このファイバーがI/Oを待機する必要がある場合、他のファイバーはI/Oの準備ができるまでコードの実行を続け、最終的に値の準備ができてチャネルを介して送信されると、メインファイバーがそれを受信します。例えば
require "socket"
channel = Channel(String).new
spawn do
server = TCPServer.new("0.0.0.0", 8080)
socket = server.accept
while line = socket.gets
channel.send(line)
end
end
spawn do
while line = gets
channel.send(line)
end
end
3.times do
puts channel.receive
end
上記のプログラムは2つのファイバーを生成します。最初のファイバーはTCPServerを作成し、1つの接続を受け入れ、そこから行を読み取り、チャネルに送信します。2番目のファイバーは標準入力から行を読み取ります。メインファイバーは、ソケットまたは標準入力のいずれかからチャネルに送信された最初の3つのメッセージを読み取り、プログラムは終了します。gets
呼び出しは、ファイバーをブロックし、データが到着した場合にそこでイベントループを続行するように指示します。
同様に、複数のファイバーの実行が完了するのを待って、それらの値を収集することもできます。
channel = Channel(Int32).new
10.times do |i|
spawn do
channel.send(i * 2)
end
end
sum = 0
10.times do
sum += channel.receive
end
puts sum # => 90
もちろん、生成されたファイバー内でreceive
を使用することもできます。
channel = Channel(Int32).new
spawn do
puts "Before send"
channel.send(1)
puts "After send"
end
spawn do
puts "Before receive"
puts channel.receive
puts "After receive"
end
puts "Before yield"
Fiber.yield
puts "After yield"
出力
Before yield
Before send
Before receive
1
After receive
After send
After yield
ここでは、最初にchannel.send
が実行されますが、値を待っているものがないため(まだ)、他のファイバーで実行が続行されます。2番目のファイバーが実行され、チャネルに値があるので、取得され、実行が続行されます。最初に最初のファイバー、次にメインファイバーで実行が続行されます。これは、Fiber.yield
がファイバーを実行キューの最後に配置するためです。
バッファ付きチャネル¶
上記の例では、バッファされていないチャネルを使用しています。値を送信する場合、ファイバーがそのチャネルを待っている場合、そのファイバーで実行が続行されます。
バッファ付きチャネルでは、send
を呼び出しても、バッファがいっぱいになるまで他のファイバーに切り替わりません。
# A buffered channel of capacity 2
channel = Channel(Int32).new(2)
spawn do
puts "Before send 1"
channel.send(1)
puts "Before send 2"
channel.send(2)
puts "Before send 3"
channel.send(3)
puts "After send"
end
3.times do |i|
puts channel.receive
end
出力
Before send 1
Before send 2
Before send 3
After send
1
2
3
最初のsend
はチャネルに空間を占有しません。これは、最初のsend
の前にreceive
が呼び出されているためです。一方、他の2つのsend
呼び出しは、それぞれのreceive
の前に発生します。send
呼び出しの数はバッファの境界を超えないため、send
ファイバーは中断されることなく完了まで実行されます。
バッファのすべての空間が占有される例を次に示します。
# A buffered channel of capacity 1
channel = Channel(Int32).new(1)
spawn do
puts "Before send 1"
channel.send(1)
puts "Before send 2"
channel.send(2)
puts "Before send 3"
channel.send(3)
puts "End of send fiber"
end
3.times do |i|
puts channel.receive
end
出力
Before send 1
Before send 2
Before send 3
1
2
3
「End of send fiber」が出力に表示されないのは、3つのsend
呼び出しをreceive
しているため、3.times
が完了まで実行され、結果としてメインファイバーのブロックが解除され、完了まで実行されるためです。
これは、まさに今見たスニペットと同じですが、一番下にFiber.yield
呼び出しが追加されています。
# A buffered channel of capacity 1
channel = Channel(Int32).new(1)
spawn do
puts "Before send 1"
channel.send(1)
puts "Before send 2"
channel.send(2)
puts "Before send 3"
channel.send(3)
puts "End of send fiber"
end
3.times do |i|
puts channel.receive
end
Fiber.yield
出力
Before send 1
Before send 2
Before send 3
1
2
3
End of send fiber
スニペットの最後にFiber.yield
呼び出しを追加することで、メインファイバーが完了まで実行されるために見逃されていた「End of send fiber」メッセージが出力に表示されます。