はじめに
ある処理の同時実行数を制限する方法は色々ある。
DBにデータをため、別プロセスがポーリングして制限数を超えるまでINSERT順にデータを取り出して処理するという方式が一般的かつ簡潔かもしれないが、DBとポーリング手法を使わずにJavaプログラムだけで制御してみた。
synchronized
を多用することになったため、synchronized
を使ったプログラムの排他制御、同期処理の良い練習になった。
要件
- WebでクライアントからあるURLにアクセスされると重い処理が走る。
- この重い処理を同時に実行できるのは1サーバにつき、20アクセスまでとする。
実装方針
Webからリクエストが来るということを考えると、タイムアウトを考慮しなくてはならず、長時間待たせるわけにはいかない。また、別プロセスでポーリングして処理を実行というのは、各スレッドがリアルタイムに処理をするWebアクセスでの実装に合わない。
そこで銀行の窓口になぞらえて実装するのがいいのではないかと考えた。
銀行では次のように窓口での受付を待つことになる。
- 整理券を受け取る。
- 待合席に座り、整理券番号が呼ばれるのを待つ。
また、次のような特性も併せ持つ。
- もし番号が呼ばれるのが遅ければ、一旦席を立ち、外に出て小用を済ませてもいい。
- 席を立って戻るのが遅ければ、番号がすでに呼ばれてしまっていることもあり、再度整理券を取る必要がある。
Webでアクセスされて動く各スレッドが、アプリケーションスコープで動くアクセス制限用クラスを通してメインの処理を行い、アクセス制限用クラスは銀行の窓口を模倣するという構成で実装していく。もし時間がかかりすぎるようであれば一旦レスポンスに整理券を入れて返してしまい、再度アクセスしてもらうようにする。
尚、今回の重いメイン処理は2秒間スリープするだけとする。
クラス構成
ソース | 説明 |
---|---|
AccessLimitUtil.java | 銀行の窓口を模倣するアクセス制限用クラス |
AccessLimitedException.java | アクセス制限にかかった時に投げられる例外 |
TestRunner.java | 重いメイン処理を走らせるテストクラス |
AccessLimitUtil
気を付けた点
アプリケーションスコープで動くSingletonクラスなので、各スレッドから同時アクセスされる。現在の処理数や新しく発行する整理券番号を取得する際にスレッド間の競合が起きないようにするため、synchronized
を使い、同期するようにする必要がある。
synchronized
を複数のオブジェクトに対して使っているが、気を付けたことは、予期せぬデッドロックを避けるため、処理の一連の流れの中でロックするオブジェクトが入れ子にならないようにすることである。次に書くソースのような、methodAでobjectAをsynchronizedブロックでロックしたまま、そのブロック内でobjectBをロックするmethodBを呼び出さないようにしている。
Object objectA = new Object();
Object objectB = new Object();
void methodA() {
synchronized(objectA) {
methodB();
}
}
void methodB() {
synchronized(objectB) {
}
}
ソース
では、本題のAccessLimitUtilを掲載する。
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* アクセス制限機能クラス
*/
public class AccessLimitUtil {
// Singleton start
private static Map<String, AccessLimitUtil> classnameToInstance = new HashMap<>();
private static Object lock = new Object();
protected AccessLimitUtil() {
synchronized (lock) {
String classname = this.getClass().getName();
if (classnameToInstance.get(classname) != null) {
throw new RuntimeException("Already created: " + classname);
}
classnameToInstance.put(classname, this);
}
}
public static AccessLimitUtil getInstance() {
return getInstance(AccessLimitUtil.class.getName());
}
public static AccessLimitUtil getInstance(String classname) {
synchronized (lock) {
AccessLimitUtil obj = (AccessLimitUtil) classnameToInstance.get(classname);
if (obj == null) {
try {
Class<?> cls = Class.forName(classname);
obj = (AccessLimitUtil) cls.newInstance();
} catch (ClassNotFoundException e) {
throw new RuntimeException(classname + " is not found");
} catch (IllegalAccessException e) {
throw new RuntimeException(classname + " cannot be accessed.");
} catch (InstantiationException e) {
throw new RuntimeException(classname + " cannot be instantiated.");
}
}
return obj;
}
}
// Singleton end
/**
* 処理実行中のスレッド数.
*/
protected AtomicInteger proceeding = new AtomicInteger();
/**
* 最大スレッド数.
*/
protected int max = 20;
/**
* 整理券の最新番号.
*/
protected long nTicketCurrent = System.currentTimeMillis();
/**
* 整理券の最新番号のロックオブジェクト.
*/
protected Object nTicketCurrentLock = new Object();
/**
* 整理券リスト.
*/
protected List<Long> nTicketList = new ArrayList<>();
/**
* 認証済み整理券リスト.
*/
protected List<Long> authorizedNTicketList = new ArrayList<>();
/**
* 認証期間.
*/
protected long authorizePeriod = 1000L * 60 * 60; // 1 hour
/**
* スリープ(ミリ秒).
*/
protected int sleep = 1000;
/**
* 最大スリープ(ミリ秒).
*/
protected int maxSleep = 1000 * 20;
/**
* スリープする.
* @param sleep ミリ秒
*/
protected void sleep(int sleep) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
return;
}
}
/**
* 整理券を発番する.
* @return 整理券
*/
protected Long issueNTicket() {
long nTicketNew = System.currentTimeMillis();
// 同時アクセスのとき、currentTimeMillisの精度では変化がつけられなかったら再帰。
boolean issued = false;
synchronized (nTicketCurrentLock) {
if (nTicketCurrent < nTicketNew) {
nTicketCurrent = nTicketNew;
issued = true;
}
}
if (!issued) {
sleep(100);
return issueNTicket();
}
addNTicketList(nTicketNew, true);
return nTicketNew;
}
/**
* 整理券を整理券リストに追加する.
* @param nTicket 整理券
* @param sort trueのとき、昇順並び替え
*/
protected void addNTicketList(Long nTicket, boolean sort) {
synchronized (nTicketList) {
nTicketList.add(nTicket);
if (sort) {
Collections.sort(nTicketList);
}
}
}
/**
* 受け付ける.
* @param nTicket 整理券
* @return 成否
*/
protected boolean accept(Long nTicket) {
// incrementAndGetすることで、1多く引くことになり、0始まりのListのindexに使える。
int acceptableIdx = max - proceeding.incrementAndGet();
if (0 <= acceptableIdx) {
synchronized (nTicketList) {
int idx = nTicketList.indexOf(nTicket);
if (0 <= idx && idx <= acceptableIdx) {
nTicketList.remove(nTicket);
return true;
}
}
}
// accept出来なかったので、最初にincrementした分を引いておく。
proceeding.decrementAndGet();
return false;
}
/**
* タイムアウト時の処理.
* @param nTicket 整理券
* @throws AccessLimitedException 引数に整理券がセットされる
*/
protected void timeout(Long nTicket) throws AccessLimitedException {
synchronized (nTicketList) {
nTicketList.remove(nTicket);
}
synchronized (authorizedNTicketList) {
authorizedNTicketList.add(nTicket);
}
throw new AccessLimitedException(nTicket);
}
/**
* 整理券の再登録.
* @param nTicket 整理券
* @return 整理券. 古い整理券を引数に渡した場合、新しく発番される可能性がある.
*/
protected Long returnedNTicket(Long nTicket) {
boolean authorized = false;
synchronized (authorizedNTicketList) {
authorized = authorizedNTicketList.remove(nTicket);
}
if (authorized) {
addNTicketList(nTicket, true);
return nTicket;
} else {
return issueNTicket();
}
}
/**
* 古い整理券を無効にする.
*/
protected void invalidateOldNTicket() {
long limit = System.currentTimeMillis() - authorizePeriod;
synchronized (authorizedNTicketList) {
Iterator<Long> ite = authorizedNTicketList.iterator();
while (ite.hasNext()) {
if (ite.next() < limit) {
ite.remove();
}
}
}
}
/**
* wait処理.
* @param nTicket 整理券
* @throws AccessLimitedException 引数に整理券がセットされる
*/
protected void wait(Long nTicket) throws AccessLimitedException {
for (int i = 0; sleep * i < maxSleep; i++) {
if (accept(nTicket)) {
return;
} else {
sleep(sleep);
}
}
timeout(nTicket);
}
/**
* アクセス実行メソッド.
* @param <T> 処理実行関数に渡す引数型
* @param <R> 処理実行関数の戻り値型
* @param accessLimitable 処理実行関数
* @param paramDto 処理実行関数に渡す引数
* @param nTicket 整理券 nullの場合新規発行し、nullでない場合再登録を試みる
* @throws AccessLimitedException 引数に整理券がセットされる
*/
public <T, R> R access(Function<T, R> accessLimitable, T param, Long nTicket) throws AccessLimitedException {
nTicket = nTicket == null ? issueNTicket() : returnedNTicket(nTicket);
wait(nTicket);
try {
System.out.println("current threads: " + proceeding.get());
return accessLimitable.apply(param);
} finally {
proceeding.decrementAndGet();
invalidateOldNTicket();
}
}
}
ソース解説
Singleton
はじめのSingleton startからSingleton endまでで、Singletonクラスを実現している。普通のSingletonの作り方より複雑なのは、このクラスを継承して、アクセス制御の仕方やアクセス時間等をカスタマイズ可能にするため。普通のSingletonはコンストラクタをprivate
で宣言するため、継承ができない。継承可能なSingletonの作り方は結城浩 氏のSingletonのサブクラス化を参考にさせていただいた。
ロジック
ロジックは今まで書いた通り、銀行の窓口の仕組みを実装している。
access
メソッドが公開しているクラスである。引数に、処理を実行する関数とその関数への引数を受け取っている。処理実行部分を外部から受け取ることで、アクセス制御の仕組みをどんな処理にでも適用できるようにしている。
最後の引数nTicket
(numbered ticket, 整理券)は通常nullを設定する。銀行の窓口待ちでいうところの、「外から戻ってきて整理券がまだ有効かチェックする」ときだけ、値を設定する。
ロジックの流れは次のようになる。
- 整理券を持っているか確認(nTicket == null ?)。
- 持っていない -> 整理券を発行。整理券は現在時刻(long)を採用する。
- 持っている -> 整理券を再登録。古ければ新規発行。
- 整理券をもって順番待ち。一定時間sleepしながら制限時間いっぱいまで順番がきたか確認する。
- 制限時間を超えた場合、例外を投げる。例外には整理券番号をセットする。
- 受付された場合、現在の処理スレッド数をカウントアップする。
- メインの処理を実行する。
- 現在の処理スレッド数をカウントダウンする。
- 管理している整理券のうち、古い整理券を無効とする。
AccessLimitedException
ソース
public class AccessLimitedException extends Exception {
private Long nTicket;
public AccessLimitedException(Long nTicket) {
super();
this.setNTicket(nTicket);
}
public Long getNTicket() {
return nTicket;
}
public void setNTicket(Long nTicket) {
this.nTicket = nTicket;
}
}
ソース解説
エラーメッセージ等を格納する通常のExceptionを拡張して、整理券を格納できるようにしている。
TestRunner
ソース
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
public class TestRunner {
public static void main(String[] args) {
int parallel = 1000;
ExecutorService ex = Executors.newFixedThreadPool(parallel);
for (int i = 0; i < parallel; i++) {
ex.execute(new Test());
}
ex.shutdown();
}
}
class Test implements Runnable {
Long nTicket;
@Override
public void run() {
Function<Param, Void> f = param -> {
// 時間のかかる処理
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return null;
};
Param param = new Param();
try {
AccessLimitUtil.getInstance().access(f, param, nTicket);
} catch (AccessLimitedException e) {
if (nTicket != null) {
System.out.println("failed again");
} else {
System.out.println("failed first time");
}
nTicket = e.getNTicket();
run();
}
}
}
class Param {
// some fields
}
ソース解説
main
メソッドから並列で1000スレッドが処理を同時に実行しようとしている。
AccessLimitUtil
を噛まして実行することで、並列で20スレッド分しか実行されず、残りのスレッドは処理可能になるまで順番を待つ。タイムアウトしてしまえばAccessLimitedException
が飛んでくるので、例外から整理券を取得し、再度処理実行を試みる。
これを実行し、AccessLimitUtil
にログ取得のため忍ばせておいた現在のスレッド数を確認する式System.out.println("current threads: " + proceeding.get());
の出力結果をみると、必ず20スレッドまでしかカウントが伸びないことが分かる。
タイムアウトした時は再実行してくれる。Webで実際に実装するとしたら、レスポンスに整理券番号をセットしてJSONで返すなどして、Ajaxで再帰処理をするのがいいだろう。
function access(nTicket) {
nTicket = typeof nTicket === 'undefined' ? null : nTicket;
$.ajax({
type : 'post',
dataType : 'json',
url : '/access',
cache : false,
data : {
"nTicket" : nTicket
, 省略
},
success : function(json){
if (json.nTicket) {
// 再帰
access(json.nTicket);
} else {
alert('成功');
}
},
error : function(jqXHR ,test,error){
alert('エラー');
}
});
}
access();