快速消息队列 (FMQ)

HIDL 的远程过程调用 (RPC) 基础设施使用 Binder 机制,这意味着调用会产生开销、需要内核操作,并且可能触发调度程序操作。但是,对于必须在进程之间传输数据且开销更小且不涉及内核的情况,则使用快速消息队列 (FMQ) 系统。

FMQ 使用所需的属性创建消息队列。您可以通过 HIDL RPC 调用发送 MQDescriptorSyncMQDescriptorUnsync 对象,接收进程使用该对象来访问消息队列。

队列类型

Android 支持两种队列类型(称为特性

  • 非同步队列允许溢出,并且可以有多个读取器;每个读取器都必须及时读取数据,否则会丢失数据。
  • 同步队列不允许溢出,并且只能有一个读取器。

两种队列类型都不允许下溢(从空队列读取会失败),并且只能有一个写入器。

非同步队列

非同步队列只有一个写入器,但可以有任意数量的读取器。队列只有一个写入位置;但是,每个读取器都跟踪其自己的独立读取位置。

只要写入操作不大于配置的队列容量(大于队列容量的写入操作会立即失败),写入队列始终会成功(不检查溢出)。由于每个读取器可能具有不同的读取位置,因此数据不会等待每个读取器都读取每条数据,而是会在新写入需要空间时从队列中掉落。

读取器负责在数据从队列末尾掉落之前检索数据。尝试读取超过可用数据的读取操作会立即失败(如果是非阻塞的)或等待足够的数据可用(如果是阻塞的)。尝试读取超过队列容量的数据的读取操作始终会立即失败。

如果读取器未能跟上写入器,以至于该读取器写入但尚未读取的数据量超过队列容量,则下一次读取不会返回数据;而是将读取器的读取位置重置为写入位置加上容量的一半,然后返回失败。这会留出缓冲区的一半可供读取,并为新的写入操作保留空间,以避免立即再次溢出队列。如果在溢出后但在下一次读取之前检查了可读取的数据,则会显示可读取的数据量大于队列容量,这表明发生了溢出。(如果在检查可用数据和尝试读取该数据之间队列溢出,则溢出的唯一指示是读取失败。)

同步队列

同步队列具有一个写入器和一个读取器,以及一个写入位置和一个读取位置。不可能写入超过队列可用空间的数据,也不可能读取超过队列当前保存的数据。根据调用的是阻塞还是非阻塞写入或读取函数,尝试超出可用空间或数据的操作会立即返回失败,或者阻塞直到可以完成所需的操作。尝试读取或写入超过队列容量的数据始终会立即失败。

设置 FMQ

消息队列需要多个 MessageQueue 对象:一个用于写入,一个或多个用于读取。没有显式配置哪个对象用于写入或读取;用户负责确保没有对象同时用于读取和写入,最多有一个写入器,对于同步队列,最多有一个读取器。

创建第一个 MessageQueue 对象

消息队列通过单个调用创建和配置

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • MessageQueue<T, flavor>(numElements) 初始化程序创建并初始化一个支持消息队列功能的对象。
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord) 初始化程序创建并初始化一个支持消息队列功能和阻塞的对象。
  • flavor 可以是用于同步队列的 kSynchronizedReadWrite 或用于非同步队列的 kUnsynchronizedWrite
  • uint16_t(在本示例中)可以是任何 HIDL 定义的类型,该类型不涉及嵌套缓冲区(没有 stringvec 类型)、句柄或接口。
  • kNumElementsInQueue 指示队列大小(以条目数计);它确定为队列分配的共享内存缓冲区的大小。

创建第二个 MessageQueue 对象

消息队列的第二端是使用从第一端获得的 MQDescriptor 对象创建的。MQDescriptor 对象通过 HIDL 或 AIDL RPC 调用发送到拥有消息队列第二端的进程。MQDescriptor 包含有关队列的信息,包括

  • 映射缓冲区和写入指针的信息。
  • 映射读取指针的信息(如果队列是同步的)。
  • 映射事件标志字的信息(如果队列是阻塞的)。
  • 对象类型 (<T, flavor>),其中包括队列元素的 HIDL 定义的类型和队列特性(同步或非同步)。

您可以使用 MQDescriptor 对象来构造 MessageQueue 对象

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

resetPointers 参数指示在创建此 MessageQueue 对象时是否将读取和写入位置重置为 0。在非同步队列中,读取位置(对于非同步队列中的每个 MessageQueue 对象是本地的)始终在创建期间设置为 0。通常,MQDescriptor 在创建第一个消息队列对象期间初始化。为了更好地控制共享内存,您可以手动设置 MQDescriptorMQDescriptorsystem/libhidl/base/include/hidl/MQDescriptor.h 中定义),然后如本节所述创建每个 MessageQueue 对象。

阻塞队列和事件标志

默认情况下,队列不支持阻塞读取和写入。有两种类型的阻塞读取和写入调用

  • 短格式,带有三个参数(数据指针、项目数、超时),支持对单个队列上的单个读取和写入操作进行阻塞。使用此格式时,队列在内部处理事件标志和位掩码,并且第一个消息队列对象必须使用 true 的第二个参数进行初始化。例如
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • 长格式,带有六个参数(包括事件标志和位掩码),支持在多个队列之间使用共享 EventFlag 对象,并允许指定要使用的通知位掩码。在这种情况下,必须将事件标志和位掩码提供给每个读取和写入调用。

对于长格式,您可以在每个 readBlocking()writeBlocking() 调用中显式提供 EventFlag。您可以初始化具有内部事件标志的队列之一,然后必须使用 getEventFlagWord() 从该队列的 MessageQueue 对象中提取该事件标志,并在每个进程中创建 EventFlag 对象以用于其他 FMQ。或者,您可以使用任何合适的共享内存初始化 EventFlag 对象。

通常,每个队列应仅使用非阻塞、短格式阻塞或长格式阻塞之一。混合使用它们没有错误,但需要仔细编程才能获得所需的结果。

将内存标记为只读

默认情况下,共享内存具有读取和写入权限。对于非同步队列 (kUnsynchronizedWrite),写入器可能希望在分发 MQDescriptorUnsync 对象之前删除所有读取器的写入权限。这可确保其他进程无法写入队列,建议这样做是为了防止读取器进程中的错误或不良行为。如果写入器希望读取器能够在每次使用 MQDescriptorUnsync 创建队列的读取端时重置队列,则内存不能标记为只读。这是 MessageQueue 构造函数的默认行为。因此,如果此队列有现有用户,则需要更改其代码以使用 resetPointer=false 构造队列。

  • 写入器:使用设置为只读 (PROT_READ) 的 MQDescriptor 文件描述符和区域调用 ashmem_set_prot_region
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • 读取器:使用 resetPointer=false 创建消息队列(默认为 true
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

使用 MessageQueue

MessageQueue 对象的公共 API 为

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

您可以使用 availableToWrite()availableToRead() 来确定可以在单个操作中传输多少数据。在非同步队列中

  • availableToWrite() 始终返回队列的容量。
  • 每个读取器都有自己的读取位置,并为 availableToRead() 执行自己的计算。
  • 从慢速读取器的角度来看,队列允许溢出;这可能会导致 availableToRead() 返回的值大于队列的大小。溢出后的第一次读取会失败,并导致该读取器的读取位置设置为等于当前写入指针,无论是否通过 availableToRead() 报告了溢出。

如果所有请求的数据都可以(并且已)传输到队列和从队列传输,则 read()write() 方法返回 true。这些方法不阻塞;它们要么成功(并返回 true),要么立即返回失败 (false)。

readBlocking()writeBlocking() 方法等待直到可以完成请求的操作,或者直到它们超时(timeOutNanos 值为 0 表示永不超时)。

阻塞操作是使用事件标志字实现的。默认情况下,每个队列都会创建并使用自己的标志字来支持 readBlocking()writeBlocking() 的短格式。多个队列可以共享一个字,以便进程可以等待对任何队列的写入或读取。通过调用 getEventFlagWord(),您可以获取指向队列的事件标志字的指针,并且可以使用该指针(或指向合适的共享内存位置的任何指针)来创建 EventFlag 对象,以传递到不同队列的 readBlocking()writeBlocking() 的长格式。readNotificationwriteNotification 参数指示应使用事件标志中的哪些位来指示该队列上的读取和写入。readNotificationwriteNotification 是 32 位位掩码。

readBlocking() 等待 writeNotification 位;如果该参数为 0,则调用始终会失败。如果 readNotification 值为 0,则调用不会失败,但成功的读取不会设置任何通知位。在同步队列中,这意味着除非在其他位置设置了该位,否则相应的 writeBlocking() 调用永远不会唤醒。在非同步队列中,writeBlocking() 不会等待(仍应使用它来设置写入通知位),并且读取不设置任何通知位是合适的。类似地,如果 readNotification 为 0,则 writeblocking() 会失败,并且成功的写入会设置指定的 writeNotification 位。

要一次等待多个队列,请使用 EventFlag 对象的 wait() 方法来等待通知位掩码。wait() 方法返回一个状态字,其中设置了导致唤醒的位。然后,此信息用于验证相应的队列是否具有足够的空间或数据来进行所需的写入和读取操作,并执行非阻塞 write()read()。要获取操作后通知,请再次调用 EventFlag 对象的 wake() 方法。有关 EventFlag 抽象的定义,请参阅 system/libfmq/include/fmq/EventFlag.h

零复制操作

readwritereadBlockingwriteBlocking() 方法将指向输入-输出缓冲区的指针作为参数,并在内部使用 memcpy() 调用在相同缓冲区和 FMQ 环形缓冲区之间复制数据。为了提高性能,Android 8.0 及更高版本包含一组 API,这些 API 提供对环形缓冲区的直接指针访问,从而无需使用 memcpy 调用。

对零复制 FMQ 操作使用以下公共 API

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • beginWrite 方法提供进入 FMQ 环形缓冲区的基指针。写入数据后,使用 commitWrite() 提交数据。beginReadcommitRead 方法的作用相同。
  • beginReadWrite 方法将要读取和写入的消息数作为输入,并返回一个布尔值,指示读取或写入是否可能。如果读取或写入是可能的,则 memTx 结构将填充可用于直接指针访问到环形缓冲区共享内存的基指针。
  • MemRegion 结构包含有关内存块的详细信息,包括基指针(内存块的基地址)和以 T 表示的长度(以消息队列的 HIDL 定义类型表示的内存块的长度)。
  • MemTransaction 结构包含两个 MemRegion 结构,firstsecond,因为读取或写入到环形缓冲区可能需要回绕到队列的开头。这意味着需要两个基指针才能将数据读取和写入到 FMQ 环形缓冲区。

要从 MemRegion 结构获取基地址和长度

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

要获取对 MemTransaction 对象中的第一个和第二个 MemRegion 结构的引用

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

使用零复制 API 写入 FMQ 的示例

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

以下帮助程序方法也是 MemTransaction 的一部分

  • T* getSlot(size_t idx); 返回指向此 MemTransaction 对象的 MemRegions 中的槽 idx 的指针。如果 MemTransaction 对象表示要读取和写入 N 个类型为 T 的项目的内存区域,则 idx 的有效范围介于 0 和 N-1 之间。
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);nMessages 个类型为 T 的项目写入对象描述的内存区域,从索引 startIdx 开始。此方法使用 memcpy(),不适合用于零复制操作。如果 MemTransaction 对象表示要读取和写入 N 个类型为 T 的项目的内存,则 idx 的有效范围介于 0 和 N-1 之间。
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); 是一种帮助程序方法,用于从对象描述的内存区域读取 nMessages 个类型为 T 的项目,从 startIdx 开始。此方法使用 memcpy(),不适合用于零复制操作。

通过 HIDL 发送队列

在创建端

  1. 如上所述创建消息队列对象。
  2. 使用 isValid() 验证对象是否有效。
  3. 如果要通过将 EventFlag 传递到 readBlocking()writeBlocking() 的长格式来等待多个队列,则可以从初始化为创建标志的 MessageQueue 对象中提取事件标志指针(使用 getEventFlagWord()),并使用该标志来创建必要的 EventFlag 对象。
  4. 使用 MessageQueue 方法 getDesc() 获取描述符对象。
  5. 在 HAL 文件中,为该方法提供类型为 fmq_syncfmq_unsync 的参数,其中 T 是合适的 HIDL 定义类型。使用此参数将 getDesc() 返回的对象发送到接收进程。

在接收端

  1. 使用描述符对象创建 MessageQueue 对象。使用相同的队列特性和数据类型,否则模板将无法编译。
  2. 如果提取了事件标志,请从接收进程中相应的 MessageQueue 对象中提取该标志。
  3. 使用 MessageQueue 对象传输数据。