kagamihogeの日記

kagamihogeの日記です。

Oracle Database Advanced Queuingをさわる

https://docs.oracle.com/cd/E57425_01/121/ADQUE/aq_intro.htm

環境

docker run --name oracle_queue -p 11521:1521 -e ORACLE_PWD="Oracle23" container-registry.oracle.com/database/free:23.5.0.0-lite

説明

設定

以下の記事を参考にサンプル用のキューなどを作成していく。基本的に、各種設定用のSQLなどは以下の記事をコピペさせていただいている。

https://qiita.com/kjmtgm/items/bb31794a7e780c853e29

以下のSQLは説明のための手抜きでSYSTEMで作成する。実際には特定のユーザに権限付与したりなどをすると思われる。

メッセージ送受信用の構造型を作成する。

CREATE TYPE MYMESG AS OBJECT (
  ID NUMBER(10,0),
  MESG  VARCHAR2(30)
);

キューのためのテーブルを作成する。

begin
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table        => 'MYMESG_TAB',
    queue_payload_type => 'MYMESG',
    multiple_consumers => TRUE
  );
end;

上で作成したキュー表に対してキューを作成する。

begin
  DBMS_AQADM.CREATE_QUEUE(
    queue_name  => 'MYMESG_Q',
    queue_table => 'MYMESG_TAB'
  );
end;

キューを開始する。

begin
  DBMS_AQADM.START_QUEUE (
    queue_name  => 'MYMESG_Q'
  );
end;

サブスクライバを追加する。

begin
  DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name  => 'MYMESG_Q',
    subscriber  => SYS.AQ$_AGENT('SUBSCR',NULL,NULL),
    rule        => NULL
  );
END;

事前準備はこれで完了。

キューに対する操作はストアドで行う。

エンキュー(メッセージ送信)する。

DECLARE
    mesg MYMESG;
    enqueue_options     dbms_aq.enqueue_options_t;
    message_properties  dbms_aq.message_properties_t;
    message_handle      RAW(16);
BEGIN
    mesg := MYMESG( 1 ,'こんにちは' );
    dbms_aq.enqueue(queue_name         => 'MYMESG_Q',
                    enqueue_options    => enqueue_options,
                    message_properties => message_properties,
                    payload            => mesg,
                    msgid              => message_handle
    );

    mesg := MYMESG( 3 ,'今晩は' );
    dbms_aq.enqueue(queue_name         => 'MYMESG_Q',
                    enqueue_options    => enqueue_options,
                    message_properties => message_properties,
                    payload            => mesg,
                    msgid              => message_handle
    );
    COMMIT;
END;

デキュー(メッセージ受信)する。

declare
  mesg MYMESG;
  dequeue_options     DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties  dbms_aq.message_properties_t;
  message_handle      RAW(16);
begin
  DBMS_OUTPUT.ENABLE(buffer_size => NULL); 
  
  dequeue_options.CONSUMER_NAME := 'SUBSCR';
  DBMS_AQ.DEQUEUE(
    queue_name         => 'MYMESG_Q',
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => mesg,
    msgid              => message_handle
  );
  
  DBMS_OUTPUT.PUT_LINE(mesg.ID || mesg.MESG);
end;

Javaからアクセス

spring-bootアプリケーションからこのキューに対する送受信を行う。

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.4.1'
    id 'io.spring.dependency-management' version '1.1.7'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    runtimeOnly 'com.oracle.database.jdbc:ojdbc11'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

tasks.named('test') {
    useJUnitPlatform()
}

以下は送信側のサンプルコード。

import java.sql.Types;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.CallableStatementCreator;
import org.springframework.jdbc.core.CallableStatementCreatorFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.SqlParameter;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@RequiredArgsConstructor
public class OracleStoreadQueue {

  final DataSource ds;

  @Transactional
  public void enqueue() {
    List<SqlParameter> params = List.of(
        new SqlParameter("id", Types.INTEGER),
        new SqlParameter("message", Types.VARCHAR));
    CallableStatementCreatorFactory factory = new CallableStatementCreatorFactory("""
        DECLARE
            mesg MYMESG;
            enqueue_options     dbms_aq.enqueue_options_t;
            message_properties  dbms_aq.message_properties_t;
            message_handle      RAW(16);
        BEGIN
            mesg := MYMESG(? ,?);
            dbms_aq.enqueue(queue_name         => 'MYMESG_Q',
                            enqueue_options    => enqueue_options,
                            message_properties => message_properties,
                            payload            => mesg,
                            msgid              => message_handle
            );
        END;
        """, params);
    CallableStatementCreator creator = factory.newCallableStatementCreator(
        Map.of("id", 1, "message", "Javaから送信"));
    JdbcTemplate jdbc = new JdbcTemplate(ds);

    Map<String, Object> result = jdbc.call(
        creator,
        params);
    System.out.println(result);

    jdbc.execute("insert into sample_table(sample_id, sample_value) values ('aa', 1)");
  }
}

Javaからストアドを呼び出すのがちょっとややこしい。

興味深いのは、トランザクションにメッセージ送信を含められる点。上記サンプルコードでは別テーブルへのinsertを含めておりこれは意図通りに動作する。つまり、commitが正常終了した時のみメッセージ送信とinsertの両方が成功する。rollbackすれば両方とも取り消される。

以下は受信側。

まず、ややこしいストアド呼出をラップするストアドを作成する。

CREATE OR REPLACE function dequeue_sample1
return VARCHAR2
IS
  mesg MYMESG; 
  dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T; 
  message_properties dbms_aq.message_properties_t; 
  message_handle     RAW(16); 
BEGIN
  dequeue_options.CONSUMER_NAME := 'SUBSCR'; 
  DBMS_AQ.DEQUEUE(
    queue_name         => 'MYMESG_Q',
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => mesg,
    msgid              => message_handle);
  return mesg.ID || mesg.MESG;
END dequeue_sample1;

適当なスケジューラを作成する。

import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class SampleScheduledComponent {

  final DequeueSample service;

  @Scheduled(fixedRate = 1000)
  public void execute() {
    service.dequeue();
  }
}

受信の本体部分。

package expr.app.oraclequeur;

import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.simple.SimpleJdbcCall;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@RequiredArgsConstructor
public class DequeueSample {

  final DataSource ds;

  @Transactional
  public void dequeue() {
    SimpleJdbcCall stored = new SimpleJdbcCall(ds)
        .withSchemaName("SYSTEM")
        .withFunctionName("dequeue_sample1");

    String message = stored.executeFunction(String.class);
    System.out.println("de-queue:" + message);

    JdbcTemplate jdbc = new JdbcTemplate(ds);
    jdbc.execute("insert into sample_table(sample_id, sample_value) values ('aa', 1)");

  }
}

上で作成したラップのストアドを呼び出してメッセージを受信する。こちらにもトランザクションのサンプルコードとしてinsertを含めている。これも意図通りの挙動になる。

その他

感想とか

アプリケーション観点から見て特筆すべきは、やはり、単一トランザクションにDB更新とメッセージ送信を含められる点だろう。何故かというとoutboxパターン(transactional-outbox)とかいうのがある程度にはRDB + MQはそれなりに頭の痛い課題である。この問題をOracleに丸投げ出来るのであればそれなりに魅力的である。

ただ、何もかもOracleに集中するのもまた問題のように思われる。すべての負荷がOracleに集中するのは問題になりそうだとか、専用のメッセージングミドルウェアの方がおそらく性能は出るだろうなぁとか。

あと、インターネットにはあんま情報が無い……いまどきストアドに馴染みのある開発者も少ないだろうし、若干の敷居の高さが無いでは無い。

とはいえ、メッセージングのあれこれをOracleだけで完結&丸投げ出来るのは良い。

com.oracle.database.messaging:aqapiは使わなくて良い?

https://mvnrepository.com/artifact/com.oracle.database.messaging/aqapi

このキューにアクセスするためと思しきライブラリが存在するが、基本的にはストアド実行で十分と思われるので、使わなくても良いのではなかろうか。また、Oracle's implementation of JMS specification in compliance with JMS 1.1 という記述もあり、JMSの場合に使うものなのかな? という気がする。

ハマった点

ORA-24033: no recipients for message

サブスクライバを登録してからでないとエンキューは出来ない。他のメッセージングミドルウェアと比較するとちょっと融通が利かないような気もする。Oracle内臓だから仕方ないのだろうか?

https://docs.oracle.com/cd/E16338_01/server.112/b61355/aq_trbl.htm#i1005947

参考URL