デバイスドライバを用いたプロセス間通信

だいぶ前から、こんなことに興味があった。

  • プロセス間通信って色々あるけど、せいぜい100バイトぐらいのメッセージをやり取りするのに一番軽いのって何?
  • プロセス間通信のログ取得も簡単にON/OFFしたい
    デバッグ/評価/市場トラブル時にアプリケーションに手を入れることなくプロセス間通信のログが取れるのは非常にありがたい。
  • シェルスクリプトからも各プロセスに指示が出せるとうれしい

今から、2年近くも前だろうか、以下の記事を読ませて頂いた。
http://cheesy.dip.jp/tutorialog/archives/6


これを参考に、以下のようなプロセス間通信を実装した。

  • 共有メモリを使うのではなく、メッセージデータをデバイスドライバにread(2)/write(2)する
    上記したような用途(100バイト程度のメッセージ)ならデバイスドライバでメッセージをコピーしてもあまりスループットに影響しないのではないか?と予想。
    プロセス間通信のログを取るために、read/writeする、という意味もある。
  • /procfs を使って、プロセス間通信のログを簡単に保存できるようにする。


(実行に移すのにえらく時間がかかったなぁ・・・)。

以下、サンプルとして、
2プロセス間で128バイトのメッセージを送受信を繰り返すプログラム書く。
まずは、上記した記事のプログラムを参考に共有メモリ方式でのプログラム、
次に、共有メモリ方式ではなく、メッセージをデバイスドライバにread/writeするプログラム、
を準備し、結果の比較をする。

  1. デバイスドライバ+共有メモリを用いたプログラム

http://cheesy.dip.jp/tutorialog/archives/6 をベースに、連続でメッセージの送受信するように修正したプログラム。変更したのは、ワークをプロセス数分用意して、read(2)/write(2)時に相手のキューを検索して、wake_up_interruptible_sync()を呼ぶようにした点。
wake_up_interruptible_sync()を呼ぶようにした理由は、こっちの方が高速に動作したから。後々、他のプロセス間通信との速度比較もしてみたいため、wake_up_interruptible_sync()を使っていく。

まずは、改造したデバイスドライバ(ipc_shmem.c)

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/poll.h>

#define DEV_NAME "ipc_shmem" // デバイス名称
#define IPC_MAJOR 100        // メジャー番号

struct IPC_DEVICE {
        int use;
        int is_readable;
        wait_queue_head_t write_q;  // write用のキュー
        wait_queue_head_t read_q;   // read用のキュー
};

static struct IPC_DEVICE *ipc_devs; // デバイスのワーク
static int ipc_devs_nr = 2;         // 用意するデバイス数
static struct semaphore ipc_sem;    // ipc_devs 操作用のセマフォ

static int
ipc_open(struct inode* inode, struct file* filp)
{
        struct IPC_DEVICE *dev=NULL;
        int i;

        // 空いているデバイスを見つける
        for(i=0; i<ipc_devs_nr; i++) {
                dev = ipc_devs + i;
                if (!dev->use)
                        break;   // 空きデバイスが見つかった
        }
        if (i == ipc_devs_nr) {  // 全てのデバイスが使用中
                return -EBUSY;
        }

        if (down_interruptible(&ipc_sem)) {
                printk(KERN_INFO "down_interruptible for read failed\n");
                return -ERESTARTSYS;
        }
        dev->use = 1;
        dev->is_readable = 0;
        up(&ipc_sem);

        // ワークを保存しておく
        filp->private_data = (void *)dev;

        return 0;
}

static int
ipc_close(struct inode* inode, struct file* filp)
{
        struct IPC_DEVICE *dev = (struct IPC_DEVICE *)filp->private_data;

        if (down_interruptible(&ipc_sem)) {
                printk(KERN_INFO "down_interruptible for read failed\n");
                return -ERESTARTSYS;
        }
        dev->use = 0;
        up(&ipc_sem);

        return 0;
}

static ssize_t
ipc_read(struct file* filp, char* buf, size_t count, loff_t* pos)
{
        struct IPC_DEVICE *dev = (struct IPC_DEVICE *)filp->private_data;
        struct IPC_DEVICE *dst;
        int i;

        if (down_interruptible(&ipc_sem)) {
                printk(KERN_INFO "down_interruptible for read failed\n");
                return -ERESTARTSYS;
        }
        dev->is_readable = 0;
        for(i=0; i<ipc_devs_nr; i++) {
                dst = ipc_devs + i;

                // 自分じゃないデバイス=書き込みしてきたデバイスを探す
                if (dst != dev && dst->use) {
                        // wake_up 関数中で相手プロセスを起こさないよう、_sync を使う
                        wake_up_interruptible_sync(&dst->write_q);
                        break;
                }
        }
        up(&ipc_sem);

        return 0;
}

static ssize_t
ipc_write(struct file* filp, const char* buf, size_t count, loff_t* pos)
{
        struct IPC_DEVICE *dev = (struct IPC_DEVICE *)filp->private_data;
        struct IPC_DEVICE *dst;
        int i;

        if (down_interruptible(&ipc_sem)) {
                printk(KERN_INFO "down_interruptible for write failed\n");
                return -ERESTARTSYS;
        }
        for(i=0; i<ipc_devs_nr; i++) {
                dst = ipc_devs + i;

                // 自分じゃないデバイス=書き込み先デバイスを探す
                if (dst != dev && dst->use) {
                        dst->is_readable = 1;

                        // wake_up 関数中で相手プロセスを起こさないよう、_sync を使う
                        wake_up_interruptible_sync(&dst->read_q);
                        break;
                }
        }
        up(&ipc_sem);

        return 0;
}

static unsigned int
ipc_poll(struct file* filp, poll_table* wait)
{
        struct IPC_DEVICE *dev = (struct IPC_DEVICE *)filp->private_data;
        unsigned int retmask = 0;

        poll_wait(filp, &dev->read_q,  wait);
        poll_wait(filp, &dev->write_q, wait);

        if (dev->is_readable) {
                retmask |= (POLLIN | POLLRDNORM);
        }
        if (!dev->is_readable) {
                retmask |= (POLLOUT | POLLWRNORM);
        }

        return retmask;
}

static struct file_operations ipc_fops =
{
        .owner   = THIS_MODULE,
        .read    = ipc_read,
        .write   = ipc_write,
        .poll    = ipc_poll,
        .open    = ipc_open,
        .release = ipc_close,
};

int
init_module(void)
{
        struct IPC_DEVICE *dev;
        int i;

        if (register_chrdev(IPC_MAJOR, DEV_NAME, &ipc_fops)) {
                printk(KERN_INFO "register_chrdev failed\n");
                return -EBUSY;
        }

        // 全デバイス分のワークエリアを確保
        ipc_devs = (struct IPC_DEVICE *)kmalloc((sizeof(*dev) * ipc_devs_nr), GFP_KERNEL);
        if (!ipc_devs) {
                return -ENOMEM;
        }

        for(i=0; i<ipc_devs_nr; i++) {
                dev = ipc_devs + i;
                dev->use = 0;
                dev->is_readable = 0;
                init_waitqueue_head(&dev->write_q);
                init_waitqueue_head(&dev->read_q);
                printk(KERN_INFO "init %d\n", i);
        }
        sema_init(&ipc_sem, 1);

        return 0;
}

void
cleanup_module(void)
{
        if (unregister_chrdev(IPC_MAJOR, DEV_NAME) ) {
                printk(KERN_INFO "unregister_chrdev failed\n");
        }
        kfree(ipc_devs);
}


Makefile はこちら。

obj-m := ipc_shmem.o

all:
        make -C /lib/modules/$(shell uname -r)/build M=`pwd` V=1 modules

clean:
        make -C /lib/modules/$(shell uname -r)/build M=$(PWD) clean

まずは、デバイスファイルを作成する。

# /bin/mknod /dev/ipc_shmem c 100 0
# chmod 777 /dev/ipc_shmem

次に、コンパイルして、ipc_shmem.ko をカーネルに組み込む。

# make
... 出力結果は省略 ...
# /sbin/insmod ipc_shmem.ko

次は動作確認用のユーザープロセス(rd.c)。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <fcntl.h>

#define DPRINTF(...)
//#define DPRINTF(...) printf(__VA_ARGS__)

#define BUFSIZE 1024

int main(int argc, char *argv[])
{
        int pagesize, mapsize;
        int fd, dev;
        char *map;
        fd_set rfds, wfds;
        char buf[128];
        int read_start;

        pagesize = sysconf(_SC_PAGE_SIZE);
        mapsize = ((BUFSIZE-1)/pagesize+1) * pagesize;

        if((fd = open("./map.shm", O_RDWR|O_CREAT, 0666)) == -1) {
                perror("open map");
                exit(-1);
        }
        if ((dev = open("/dev/ipc_shmem", O_RDWR)) == -1) {
                perror("open ipc_shmem");
                exit(-1);
        }
        if (ftruncate(fd, mapsize) == -1) {
                perror("ftruncate");
                exit(-1);
        }
        map = (char *) mmap(0, mapsize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
       if (map == MAP_FAILED) {
                perror("mmap");
                return -1;
        }

        read_start=1;   // メッセージのreadから開始
        while (1) {
                if (read_start) {
                        FD_ZERO(&rfds);
                        FD_SET(dev, &rfds);

                        DPRINTF("selecting read...\n");
                        if (select(dev+1, &rfds, NULL, NULL, NULL) == -1) {
                                perror("select");
                        }

                        if (FD_ISSET(dev, &rfds)) {
                                DPRINTF("read [%s]\n", map);  // 受信メッセージの表示
                                read(dev, buf, sizeof(buf));
                        }
                        read_start = 0;
                } else {
                        memset(buf, 'r', sizeof(buf));   // 送信メッセージの作成
                        buf[sizeof(buf)-1] = '\0';

                        FD_ZERO(&wfds);
                        FD_SET(dev, &wfds);
                        DPRINTF("selecting write...\n");
                        if (select(dev+1, NULL, &wfds, NULL, NULL) == -1) {
                                perror("select");
                                exit(-1);
                        }

                        if (FD_ISSET(dev, &wfds)) {
                                strncpy(map, buf, sizeof(buf));
                                if (write(dev, NULL, 1) == -1) {
                                        perror("write");
                                        exit(-1);
                                }
                        }
                       read_start = 1;
//                      sleep(1);
                }
        }

        munmap(map, mapsize);
        close(fd);
        close(dev);

        return 0;
}

次は動作確認用のユーザープロセス(wr.c)。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <fcntl.h>

#define DPRINTF(...)
//#define DPRINTF(...) printf(__VA_ARGS__)

#define BUFSIZE 1024

int main(int argc, char *argv[])
{
        int pagesize, mapsize;
        int fd, dev;
        char *map;
        fd_set rfds, wfds;
        char buf[128];
        int read_start, i, nloop;

        if (argc != 2) {
                printf("usage:\n./wr <NLOOP>\n");
                exit(-1);
        }
        nloop = atoi(argv[1]);

        pagesize = sysconf(_SC_PAGE_SIZE);
        mapsize = ((BUFSIZE-1)/pagesize+1) * pagesize;

        if((fd = open("./map.shm", O_RDWR|O_CREAT, 0666)) == -1) {
                perror("open map");
                exit(-1);
        }
        if ((dev = open("/dev/ipc_shmem", O_RDWR)) == -1) {
                perror("open ipc_shmem");
                exit(-1);
       }
        if (ftruncate(fd, mapsize) == -1) {
                perror("ftruncate");
                exit(-1);
        }
        map = (char *) mmap(0, mapsize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
        if (map == MAP_FAILED) {
                perror("mmap");
                exit(-1);
        }

        read_start=0;   // メッセージのwriteから開始
        for (i=0; i<nloop; i++) {
                if (read_start) {
                        FD_ZERO(&rfds);
                        FD_SET(dev, &rfds);

                        DPRINTF("selecting read...\n");
                        if (select(dev+1, &rfds, NULL, NULL, NULL) == -1) {
                                perror("select");
                        }

                        if (FD_ISSET(dev, &rfds)) {
                                DPRINTF("read [%s]\n", map);  // 受信メッセージの表示
                                read(dev, buf, sizeof(buf));  // メッセージ領域を解放
                        }
                        read_start = 0;
                } else {
                        memset(buf, 'w', sizeof(buf));  // 送信メッセージの作成
                        buf[sizeof(buf)-1] = '\0';

                        FD_ZERO(&wfds);
                        FD_SET(dev, &wfds);
                        DPRINTF("selecting write...\n");
                        if (select(dev+1, NULL, &wfds, NULL, NULL) == -1) {
                                perror("select");
                                exit(-1);
                        }

                        if (FD_ISSET(dev, &wfds)) {
                               strncpy(map, buf, sizeof(buf));
                                if (write(dev, NULL, 1) == -1) {  // メッセージの送信
                                        perror("write");
                                        exit(-1);
                                }
                        }
                        read_start = 1;
//                      sleep(1);
                }
        }
        printf("write loop = %d\n", i/2);
        munmap(map, mapsize);
        close(fd);
        close(dev);

        return 0;
}
all:
        gcc -g -o wr wr.c
        gcc -g -o rd rd.c
clean:
        rm -f *.o wr rd

早速実行して、プロセス間通信を100000回ぐらいやった時の時間を測定する。
測定PCは、VMWarePlayer上のFedoraCore4(CPU:Pentium4 メモリ割当:256MB)、ホストOSは、WinXP HomeEdition メモリ1GB。
結果をみても、ちょっとした組込み機器ぐらいの能力しかないなぁ。そのうち、Linuxのネイティブ機で測定しなおそう。

$ make
gcc -g -o wr wr.c
gcc -g -o rd rd.c
$ ./rd&
$ time ./wr 100000
write loop = 50000

real    0m5.535s
user    0m0.008s
sys     0m2.784s
  1. デバイスドライバにRead/Writeするプログラム

次に、共有メモリではなく、デバイスドライバに対してメッセージをread(2)、write(2)することでプロセス間通信するプログラムを書く。

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/proc_fs.h>
#include <linux/poll.h>

#define DPRINTK(...)
//#define DPRINTK(...) printk(__VA_ARGS__)

#define DEV_NAME "ipc"              // デバイス名称
#define PROC_NAME "driver/ipc"      // procfs 上の名称。/proc/driver/ipc となる。
#define IPC_MAJOR 101               // メジャー番号、ipc_memより1つずらす

struct IPC_DEVICE {
        int use;                    // open 中は、use=1
        int is_readable;
        wait_queue_head_t write_q;  // write用のキュー
        wait_queue_head_t read_q;   // read用のキュー
        char msg[128];              // read待ちのメッセージ
};

static struct IPC_DEVICE *ipc_devs; // デバイスのワーク
static int ipc_devs_nr = 2;         // 用意するデバイス数
static struct semaphore ipc_sem;    // ipc_devs 操作用のセマフォ
static char proc_data[32];          // procfs で書き込まれたデータ
static int log = 0;                 // ログ取得するかどうかのフラグ

static int
ipc_open(struct inode* inode, struct file* filp)
{
        struct IPC_DEVICE *dev=NULL;
        int i;

        // 空いているデバイスを探す
        for(i=0; i<ipc_devs_nr; i++) {
                dev = ipc_devs + i;
                if (!dev->use)
                        break;   // 空きデバイスあり
        }
        if (i == ipc_devs_nr) {  // 空きデバイスなし
                return -EBUSY;
        }

        if (down_interruptible(&ipc_sem)) {
                printk(KERN_INFO "down_interruptible for read failed\n");
                return -ERESTARTSYS;
        }
        dev->use = 1;
        dev->is_readable = 0;
        up(&ipc_sem);

        // ワークを保存
        filp->private_data = (void *)dev;

        DPRINTK(KERN_INFO "%s no=%d\n", __func__, i);

        return 0;
}

static int
ipc_close(struct inode* inode, struct file* filp)
{
        struct IPC_DEVICE *dev = (struct IPC_DEVICE *)filp->private_data;

        if (down_interruptible(&ipc_sem)) {
                printk(KERN_INFO "down_interruptible for read failed\n");
                return -ERESTARTSYS;
        }
        dev->use = 0;
        up(&ipc_sem);

        DPRINTK(KERN_INFO "%s \n", __func__);

        return 0;
}

static ssize_t
ipc_read(struct file* filp, char* buf, size_t count, loff_t* pos)
{
        struct IPC_DEVICE *dev = (struct IPC_DEVICE *)filp->private_data;
        struct IPC_DEVICE *dst;
        int i;

        if (down_interruptible(&ipc_sem)) {
                printk(KERN_INFO "down_interruptible for read failed\n");
                return -ERESTARTSYS;
        }

        if (count < sizeof(dev->msg) ||
            copy_to_user(buf, dev->msg, sizeof(dev->msg))) {
                up(&ipc_sem);
                return -EFAULT;
        }
        dev->is_readable = 0;
        for(i=0; i<ipc_devs_nr; i++) {
                dst = ipc_devs + i;
                // 自分じゃないデバイス=書き込みしてきたデバイスを探す
                if (dst != dev && dst->use) {
                        // wake_up 関数中で相手プロセスを起こさないよう、_sync を使う
                        wake_up_interruptible_sync(&dst->write_q);
                        DPRINTK(KERN_INFO "%s wake_up %d\n", __func__, i);
                        break;
                }
        }

        up(&ipc_sem);

        if(log) {
                // ログ取得処理。ここでは、ひとまずsyslogに出力する。
                // 本来なら、他のログ取得専用のプロセスでメッセージを取得した方がいい。
                printk(KERN_INFO "%s %s\n", __func__, dev->msg);
        }

        return sizeof(dev->msg);
}

static ssize_t
ipc_write(struct file* filp, const char* buf, size_t count, loff_t* pos)
{
        struct IPC_DEVICE *dev = (struct IPC_DEVICE *)filp->private_data;
        struct IPC_DEVICE *dst = NULL;
        int i;

        if (down_interruptible(&ipc_sem)) {
                printk(KERN_INFO "down_interruptible for write failed\n");
                return -ERESTARTSYS;
        }

        for(i=0; i<ipc_devs_nr; i++) {
                dst = ipc_devs + i;
                // 自分じゃないデバイス=書き込み先バイスを探す
                if (dst != dev && dst->use) {
                        if (!dst->is_readable) {
                                if(count > sizeof(dst->msg))
                                        count = sizeof(dst->msg);
                                if(copy_from_user(dst->msg, buf, count)) {
                                        up(&ipc_sem);
                                        return -EFAULT;
                                }
                                dst->is_readable = 1;
                                // wake_up 関数中で相手プロセスを起こさないよう、_sync を使う
                                wake_up_interruptible_sync(&dst->read_q);
                                DPRINTK(KERN_INFO "%s wake_up %d\n", __func__, i);
                        }
                        break;
                }
        }

        up(&ipc_sem);

        if(log) {
                // ログ取得処理。ここでは、ひとまずsyslogに出力する。
                // 本来なら、他のログ取得専用のプロセスでメッセージを取得した方がいい。
                printk(KERN_INFO "%s %s\n", __func__, dst->msg);
        }

        return count;
}

static unsigned int
ipc_poll(struct file* filp, poll_table* wait)
{
        struct IPC_DEVICE *dev = (struct IPC_DEVICE *)filp->private_data;
        unsigned int retmask = 0;

        poll_wait(filp, &dev->read_q,  wait);
        poll_wait(filp, &dev->write_q, wait);

        if (dev->is_readable) {
                retmask |= (POLLIN | POLLRDNORM);
        }
        if (!dev->is_readable) {
                retmask |= (POLLOUT | POLLWRNORM);
        }
        DPRINTK(KERN_INFO "%s %x\n", __func__, retmask);

        return retmask;
}

static struct file_operations ipc_fops =
{
        .owner   = THIS_MODULE,
        .read    = ipc_read,
        .write   = ipc_write,
        .poll    = ipc_poll,
        .open    = ipc_open,
        .release = ipc_close,
};

// procfs の書き込み処理。 echo -n "XXX" > /proc/driver/ipc というのを想定。
static int
proc_write( struct file *filp, const char *buf, unsigned long len, void *data )
{
        if (len >= (sizeof(proc_data)-1)) {
                printk(KERN_INFO "%s length is over\n", __func__);
                return -ENOSPC;
        }

        // 書き込みデータの保存
        if (copy_from_user(proc_data, buf, len)) {
                return -EFAULT;
        }
        proc_data[len] = '\0';

        if (!strcmp(proc_data, "on")) {
                log = 1;   // ログ取得開始
        } else {
                log = 0;   // ログ取得停止
        }

        DPRINTK(KERN_INFO "%s =%s(%d) log=%d\n", __func__, proc_data, (int)len, log);
        return len;
}

// procfs の読み出し処理。 cat /proc/driver/ipc というのを想定。
// 今回のプログラムでは必須ではないが、書き込んだ文字列を読み出す。
static int
proc_read( char *page, char **start, off_t offset, int count, int *eof, void *data )
{
        int ret;
        ret = snprintf(page, count, "%s", proc_data);
        *eof = 1;
        DPRINTK(KERN_INFO "%s =%s(%d)\n", __func__, proc_data, ret);

        return ret;
}

int
init_module(void)
{
        struct IPC_DEVICE *dev;
        struct proc_dir_entry* entry;
        int i;

        if (register_chrdev(IPC_MAJOR, DEV_NAME, &ipc_fops)) {
                printk(KERN_INFO "register_chrdev failed\n");
                return -EBUSY;
        }
        // 全デバイス分の領域を取得
        ipc_devs = (struct IPC_DEVICE *)kmalloc((sizeof(*dev) * ipc_devs_nr), GFP_KERNEL);
        if (!ipc_devs) {
                return -ENOMEM;
        }

        for(i=0; i<ipc_devs_nr; i++) {
                dev = ipc_devs + i;
                dev->use = 0;
                dev->is_readable = 0;
                init_waitqueue_head(&dev->write_q);
                init_waitqueue_head(&dev->read_q);
                memset(dev->msg, 0, sizeof(dev->msg));
                printk(KERN_INFO "init %d\n", i);
        }
        sema_init(&ipc_sem, 1);

        // procfs用のエントリを作成
        entry = create_proc_entry(PROC_NAME, 0666, NULL);
        if (entry) {
                // write, read 用の関数を登録
                entry->write_proc  = proc_write;
                entry->read_proc  = proc_read;
                entry->owner = THIS_MODULE;
        } else {
                printk(KERN_ERR "create_proc_entry failed\n");
                return -EBUSY;
        }
        DPRINTK(KERN_INFO "ipc driver is loaded\n");

        return 0;
}

void
cleanup_module(void)
{
        if (unregister_chrdev(IPC_MAJOR, DEV_NAME) ) {
                printk(KERN_INFO "unregister_chrdev failed\n");
        }
        kfree(ipc_devs);

        remove_proc_entry(PROC_NAME, NULL);

        DPRINTK(KERN_INFO "ipc driver is unloaded\n");
}

オブジェクト名しか変わらないが、Makefile はこちら。

obj-m := ipc.o

all:
        make -C /lib/modules/$(shell uname -r)/build M=`pwd` V=1 modules

clean:
        make -C /lib/modules/$(shell uname -r)/build M=$(PWD) clean

同様に、デバイスファイルを作成する。

# /bin/mknod /dev/ipc c 101 0
# chmod 777 /dev/ipc

同様に、コンパイルして、ipc.ko をカーネルに組み込む。

# make
... 出力結果は省略 ...
# /sbin/insmod ipc.ko

次は動作確認用のユーザープロセス(rd.c)。
共有メモリの操作がなくなり、メッセージ送信はwrite(2)、メッセージ受信はread(2) と、より直感的なコードになった。POSIXのメッセージキューとよく似たプログラムだなぁ。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <fcntl.h>
#include <errno.h>

#define DPRINTF(...)
//#define DPRINTF(...) printf(__VA_ARGS__)

int main(int argc, char *argv[])
{
        int dev;
        fd_set rfds, wfds;
        char buf[128];
        int read_start;

        if ((dev = open("/dev/ipc", O_RDWR)) == -1) {
                perror("open ipc");
                exit(-1);
        }

        read_start=1;   // メッセージ受信から開始
        while (1) {
                if (read_start) {
                        buf[0] = '\0';

                        FD_ZERO(&rfds);
                        FD_SET(dev, &rfds);

                        DPRINTF("selecting read...\n");
                        if (select(dev+1, &rfds, NULL, NULL, NULL) == -1) {
                                perror("select");
                        }

                        if (FD_ISSET(dev, &rfds)) {
                               int n;
                                n = read(dev, buf, sizeof(buf));
                                if (n > 0 ) {
                                        // 受信メッセージの表示
                                        DPRINTF("%d = read [%s]\n", n, buf);
                                } else {
                                        perror("read");
                                        exit(-1);
                                }
                        }
                        read_start = 0;
                } else {
                        memset(buf, 'r', sizeof(buf));  // 送信メッセージの作成
                        buf[sizeof(buf)-1] = '\0';

                        FD_ZERO(&wfds);
                        FD_SET(dev, &wfds);
                        DPRINTF("selecting write...\n");
                        // 今回、writeのselectはなくてもよいが、共有メモリ方式と
                        // 処理内容を合わせるために使用
                        if (select(dev+1, NULL, &wfds, NULL, NULL) == -1) {
                                perror("select");
                                exit(-1);
                        }

                        if (FD_ISSET(dev, &wfds)) {
                                // メッセージの送信
                                if (write(dev, buf, sizeof(buf)) != sizeof(buf)) {
                                        perror("write");
                                        exit(-1);
                                }
                        }
                        read_start = 1;
//                      sleep(1);
                }
        }
        close(dev);

        return 0;
}


次は動作確認用のユーザープロセス(wr.c)。

include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <fcntl.h>
#include <errno.h>

#define DPRINTF(...)
//#define DPRINTF(...) printf(__VA_ARGS__)

int main(int argc, char *argv[])
{
        int dev;
        fd_set rfds, wfds;
        char buf[128];
        int read_start, i, nloop;

        if (argc != 2) {
                printf("usage:\n./wr <NLOOP>\n");
                exit(-1);
        }
        nloop = atoi(argv[1]);

        if ((dev = open("/dev/ipc", O_RDWR)) == -1) {
                perror("open ipc");
                exit(-1);
        }

        read_start=0;  // メッセージの送信から開始
        for (i=0; i<nloop; i++) {
                if (read_start) {
                        buf[0] = '\0';

                        FD_ZERO(&rfds);
                        FD_SET(dev, &rfds);

                        DPRINTF("selecting read...\n");
                        if (select(dev+1, &rfds, NULL, NULL, NULL) == -1) {
                                perror("select");
                        }

                        if (FD_ISSET(dev, &rfds)) {
                                int n;
                                n = read(dev, buf, sizeof(buf));
                                if (n > 0 ) {
                                        // 受信メッセージの表示
                                        DPRINTF("%d = read [%s]\n", n, buf);
                                } else {
                                        perror("read");
                                        exit(-1);
                                }
                        }
                        read_start = 0;
                } else {
                        memset(buf, 'w', sizeof(buf));  // 送信メッセージの作成
                        buf[sizeof(buf)-1] = '\0';

                        FD_ZERO(&wfds);
                        FD_SET(dev, &wfds);
                        DPRINTF("selecting write...\n");
                        // 今回、writeのselectはなくてもよいが、共有メモリ方式と
                        // 処理内容を合わせるために使用
                         if (select(dev+1, NULL, &wfds, NULL, NULL) == -1) {
                                perror("select");
                                exit(-1);
                        }

                        if (FD_ISSET(dev, &wfds)) {
                                // メッセージの送信
                                if (write(dev, buf, sizeof(buf)) != sizeof(buf)) {
                                        perror("write");
                                        exit(-1);
                                }
                        }
                        read_start = 1;
//                      sleep(1);
                }
        }
        printf("write loop= %d\n", i/2);
        close(dev);

        return 0;
}
all:
        gcc -g -o wr wr.c
        gcc -g -o rd rd.c
clean:
        rm -f *.o wr rd

早速実行して、プロセス間通信を100000回ぐらいやった時の時間を測定する。

$ make
gcc -g -o wr wr.c
gcc -g -o rd rd.c
$ ./rd&
$ time ./wr 100000
write loop= 50000

real    0m5.624s
user    0m0.004s
sys     0m2.792s


共有メモリ方式とメッセージread/write方式で、実行時間は 5.535 秒から 5.624 秒へと101.6%長くなった。この1.6%がメッセージのread/writeにかかるコストということになる。
128バイトのコピーがプロセス間通信の1.6%を占める?
長すぎるような気がしないわけでもないが、そういうことらしい。
近いうちに、Linuxネイティブ環境でも測定してみよう。



また、echoコマンド1つで、syslogにプロセス間通信のログ取得をON/OFFすることができるようになりました。他の文字をechoするまで、ログを取得し続けます。

$ echo -n "on" > /proc/driver/ipc   # ログ取得開始
$ echo -n "off" > /proc/driver/ipc   # ログ取得停止
  1. まとめ

デバイスドライバを用いて比較的高速なプロセス間通信ができました。
また、このデバイスドライバシェルスクリプトをから、プロセス間通信のログ取得を簡単にON/OFFできるので、アプリケーションのデバッグ/評価/市場トラブル(?)等で役に立つのではないか、と思います。

  1. TODO

近日中に以下をやっていきたいと思っています。