https://docs.oracle.com/cd/E57425_01/121/ADQUE/aq_intro.htm
環境
- Oracle Database 23ai Free Release 23.0.0.0.0
- A5:SQL Mk-2 Version 2.20.0
docker run --name oracle_queue -p 11521:1521 -e ORACLE_PWD="Oracle23" container-registry.oracle.com/database/free:23.5.0.0-lite
説明
設定
以下の記事を参考にサンプル用のキューなどを作成していく。基本的に、各種設定用のSQLなどは以下の記事をコピペさせていただいている。
https://qiita.com/kjmtgm/items/bb31794a7e780c853e29
以下のSQLは説明のための手抜きでSYSTEM
で作成する。実際には特定のユーザに権限付与したりなどをすると思われる。
メッセージ送受信用の構造型を作成する。
CREATE TYPE MYMESG AS OBJECT ( ID NUMBER(10,0), MESG VARCHAR2(30) );
キューのためのテーブルを作成する。
begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'MYMESG_TAB', queue_payload_type => 'MYMESG', multiple_consumers => TRUE ); end;
上で作成したキュー表に対してキューを作成する。
begin DBMS_AQADM.CREATE_QUEUE( queue_name => 'MYMESG_Q', queue_table => 'MYMESG_TAB' ); end;
キューを開始する。
begin DBMS_AQADM.START_QUEUE ( queue_name => 'MYMESG_Q' ); end;
サブスクライバを追加する。
begin DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'MYMESG_Q', subscriber => SYS.AQ$_AGENT('SUBSCR',NULL,NULL), rule => NULL ); END;
事前準備はこれで完了。
キューに対する操作はストアドで行う。
エンキュー(メッセージ送信)する。
DECLARE mesg MYMESG; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); BEGIN mesg := MYMESG( 1 ,'こんにちは' ); dbms_aq.enqueue(queue_name => 'MYMESG_Q', enqueue_options => enqueue_options, message_properties => message_properties, payload => mesg, msgid => message_handle ); mesg := MYMESG( 3 ,'今晩は' ); dbms_aq.enqueue(queue_name => 'MYMESG_Q', enqueue_options => enqueue_options, message_properties => message_properties, payload => mesg, msgid => message_handle ); COMMIT; END;
デキュー(メッセージ受信)する。
declare mesg MYMESG; dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties dbms_aq.message_properties_t; message_handle RAW(16); begin DBMS_OUTPUT.ENABLE(buffer_size => NULL); dequeue_options.CONSUMER_NAME := 'SUBSCR'; DBMS_AQ.DEQUEUE( queue_name => 'MYMESG_Q', dequeue_options => dequeue_options, message_properties => message_properties, payload => mesg, msgid => message_handle ); DBMS_OUTPUT.PUT_LINE(mesg.ID || mesg.MESG); end;
Javaからアクセス
spring-bootアプリケーションからこのキューに対する送受信を行う。
plugins { id 'java' id 'org.springframework.boot' version '3.4.1' id 'io.spring.dependency-management' version '1.1.7' } group = 'com.example' version = '0.0.1-SNAPSHOT' java { toolchain { languageVersion = JavaLanguageVersion.of(17) } } configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-jpa' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' runtimeOnly 'com.oracle.database.jdbc:ojdbc11' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } tasks.named('test') { useJUnitPlatform() }
以下は送信側のサンプルコード。
import java.sql.Types; import java.util.List; import java.util.Map; import javax.sql.DataSource; import lombok.RequiredArgsConstructor; import org.springframework.jdbc.core.CallableStatementCreator; import org.springframework.jdbc.core.CallableStatementCreatorFactory; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.SqlParameter; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @RequiredArgsConstructor public class OracleStoreadQueue { final DataSource ds; @Transactional public void enqueue() { List<SqlParameter> params = List.of( new SqlParameter("id", Types.INTEGER), new SqlParameter("message", Types.VARCHAR)); CallableStatementCreatorFactory factory = new CallableStatementCreatorFactory(""" DECLARE mesg MYMESG; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); BEGIN mesg := MYMESG(? ,?); dbms_aq.enqueue(queue_name => 'MYMESG_Q', enqueue_options => enqueue_options, message_properties => message_properties, payload => mesg, msgid => message_handle ); END; """, params); CallableStatementCreator creator = factory.newCallableStatementCreator( Map.of("id", 1, "message", "Javaから送信")); JdbcTemplate jdbc = new JdbcTemplate(ds); Map<String, Object> result = jdbc.call( creator, params); System.out.println(result); jdbc.execute("insert into sample_table(sample_id, sample_value) values ('aa', 1)"); } }
Javaからストアドを呼び出すのがちょっとややこしい。
興味深いのは、トランザクションにメッセージ送信を含められる点。上記サンプルコードでは別テーブルへのinsert
を含めておりこれは意図通りに動作する。つまり、commitが正常終了した時のみメッセージ送信とinsert
の両方が成功する。rollbackすれば両方とも取り消される。
以下は受信側。
まず、ややこしいストアド呼出をラップするストアドを作成する。
CREATE OR REPLACE function dequeue_sample1 return VARCHAR2 IS mesg MYMESG; dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties dbms_aq.message_properties_t; message_handle RAW(16); BEGIN dequeue_options.CONSUMER_NAME := 'SUBSCR'; DBMS_AQ.DEQUEUE( queue_name => 'MYMESG_Q', dequeue_options => dequeue_options, message_properties => message_properties, payload => mesg, msgid => message_handle); return mesg.ID || mesg.MESG; END dequeue_sample1;
適当なスケジューラを作成する。
import lombok.RequiredArgsConstructor; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor public class SampleScheduledComponent { final DequeueSample service; @Scheduled(fixedRate = 1000) public void execute() { service.dequeue(); } }
受信の本体部分。
package expr.app.oraclequeur; import javax.sql.DataSource; import lombok.RequiredArgsConstructor; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.simple.SimpleJdbcCall; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @Component @RequiredArgsConstructor public class DequeueSample { final DataSource ds; @Transactional public void dequeue() { SimpleJdbcCall stored = new SimpleJdbcCall(ds) .withSchemaName("SYSTEM") .withFunctionName("dequeue_sample1"); String message = stored.executeFunction(String.class); System.out.println("de-queue:" + message); JdbcTemplate jdbc = new JdbcTemplate(ds); jdbc.execute("insert into sample_table(sample_id, sample_value) values ('aa', 1)"); } }
上で作成したラップのストアドを呼び出してメッセージを受信する。こちらにもトランザクションのサンプルコードとしてinsert
を含めている。これも意図通りの挙動になる。
その他
感想とか
アプリケーション観点から見て特筆すべきは、やはり、単一トランザクションにDB更新とメッセージ送信を含められる点だろう。何故かというとoutboxパターン(transactional-outbox)とかいうのがある程度にはRDB + MQはそれなりに頭の痛い課題である。この問題をOracleに丸投げ出来るのであればそれなりに魅力的である。
ただ、何もかもOracleに集中するのもまた問題のように思われる。すべての負荷がOracleに集中するのは問題になりそうだとか、専用のメッセージングミドルウェアの方がおそらく性能は出るだろうなぁとか。
あと、インターネットにはあんま情報が無い……いまどきストアドに馴染みのある開発者も少ないだろうし、若干の敷居の高さが無いでは無い。
とはいえ、メッセージングのあれこれをOracleだけで完結&丸投げ出来るのは良い。
com.oracle.database.messaging:aqapiは使わなくて良い?
https://mvnrepository.com/artifact/com.oracle.database.messaging/aqapi
このキューにアクセスするためと思しきライブラリが存在するが、基本的にはストアド実行で十分と思われるので、使わなくても良いのではなかろうか。また、Oracle's implementation of JMS specification in compliance with JMS 1.1
という記述もあり、JMSの場合に使うものなのかな? という気がする。
ハマった点
ORA-24033: no recipients for message
サブスクライバを登録してからでないとエンキューは出来ない。他のメッセージングミドルウェアと比較するとちょっと融通が利かないような気もする。Oracle内臓だから仕方ないのだろうか?
https://docs.oracle.com/cd/E16338_01/server.112/b61355/aq_trbl.htm#i1005947
参考URL
- https://qiita.com/kjmtgm/items/bb31794a7e780c853e29
- DBMS_AQADM https://docs.oracle.com/cd/E16338_01/appdev.112/b56262/d_aqadm.htm
- DBMS_AQ https://docs.oracle.com/cd/E16338_01/appdev.112/b56262/d_aq.htm#i1001648
- https://docs.oracle.com/en/database/oracle/oracle-database/21/arpls/advanced-queuing-AQ-types.html
- https://docs.oracle.com/cd/E16338_01/server.112/b61355/aq_trbl.htm