一、IO交互

文件下载过程中的IO交互主要分为以下几个步骤:
1、APP通过 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_LIST_REQ) 查询指定时间内的文件列表
2、设备通过 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_LIST_RESP) 回复文件列表
3、用户选择文件后,APP通过 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_DOWNLOAD_REQ) 通知设备
4、设备根据资源情况,通过 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_DOWNLOAD_RESP) 告知APP使用的通道数和API类型
5、双方创建通道并进行数据传输
6、传输完成后关闭通道
二、RDT通道的创建和销毁
创建(APP端/设备端通用)
int createChannelForDownload(int sid, int iotc_channel_id)
{
return RDT_Create(sid, 5000, iotcChannelId);
}
销毁(APP端/设备端通用)
void destoryChannelOfDownload(int rdt_id)
{
if (rdt_id < 0)
return;
RDT_Abort(rdt_id);
}
三、数据的传输
RDT协议本身提供的接口只有Read和Write,传输过程存在粘包的情况,需要另外设计切包组包的机制。每个数据包分为包头、数据和包尾,格式参考:公版RDT帧定义
设备端
说明:RDT的数据传输是可靠的传输,每次调用RDT_Write,都将数据写入本地的发送缓存区,所以等所有的数据都使用RDT_Write写入后,需要检查发送缓存区是否为空,为空,则说明数据已经送完。
写入数据相关(伪)代码:
写入数据相关(伪)代码:
#define RDT_HEADER_FRAMEBEGIN_ADDR 0
#define RDT_HEADER_FILENAME_ADDR 4
#define RDT_HEADER_FILESIZE_ADDR 68
#define RDT_HEADER_FRAMESIZE_ADDR 72
#define RDT_HEADER_ENDFLAG_ADDR 76
#define RDT_FRAME_BUFFER_ADDR 77
#define RDT_FRAME_BUFFER_SIZE 10243
#define RDT_FRAME_HEADER_SIZE 77
#define RDT_PAYLOAD_WRITE_POSTION 77
#define RDT_FRAME_TAIL_SIZE 2
#define RDT_FILENAME_MAX_LEN (RDT_HEADER_FILESIZE_ADDR - RDT_HEADER_FILENAME_ADDR) // 64字节
int createChannelForDownload(int sid, int iotc_channel_id);
void destroyChannelOfDownload(int rdt_id);
union typeXChange {
uint32_t uint32_data;
char char_data[4];
};
bool swapUint32ToCharBuffer(const uint32_t uint32_data, char* buffer, size_t bufferSize) {
if (!buffer || bufferSize != 4)
return false;
typeXChange tmp;
tmp.uint32_data = htonl(uint32_data); // 转换为网络字节序(大端)
memcpy(buffer, tmp.char_data, 4);
return true;
}
生成帧头
void createPacketHeader(char* buffer_write_postion, size_t bufferSize, const char* fileName, uint32_t fileSize, uint32_t payloadSize,bool endFlag) {
// 校验缓冲区是否足够存储header
if (!buffer_write_postion || bufferSize < RDT_FRAME_HEADER_SIZE)
return;
memset(buffer_write_postion, 0, RDT_FRAME_HEADER_SIZE);
// 写入帧头标识 IOTC
buffer_write_postion[0] = 'I';
buffer_write_postion[1] = 'O';
buffer_write_postion[2] = 'T';
buffer_write_postion[3] = 'C';
// 拷贝文件名:限制长度,手动加终止符
strncpy(buffer_write_postion + RDT_HEADER_FILENAME_ADDR, fileName, RDT_FILENAME_MAX_LEN);
buffer_write_postion[RDT_HEADER_FILESIZE_ADDR - 1] = '\0'; // 确保终止
// 写入文件大小、payload大小(转换为uint32_t,避免截断)
swapUint32ToCharBuffer(fileSize, buffer_write_postion + RDT_HEADER_FILESIZE_ADDR, 4);
swapUint32ToCharBuffer(payloadSize, buffer_write_postion + RDT_HEADER_FRAMESIZE_ADDR, 4);
// 写入结束标志
buffer_write_postion[RDT_HEADER_ENDFLAG_ADDR] = endFlag ? 1 : 0;
}
生成帧尾
void createPacketTail(char* buffer_write_postion, size_t bufferSize) {
if (!buffer_write_postion || bufferSize < RDT_FRAME_TAIL_SIZE)
return;
buffer_write_postion[0] = 'G';
buffer_write_postion[1] = 'C';
}
发送单个文件
int sendOneFile2Client(int rdt_id, file& f, bool isLastFile) {
int ret = 0;
bool rdt_endflag = false;
size_t fileSize = f.size();
std::string fileName = f.name();
uint32_t rdt_file_size = static_cast(fileSize); // 转为uint32_t(若文件超4G需改用uint64_t)
const size_t buffer_for_payload_size = RDT_FRAME_BUFFER_SIZE - RDT_FRAME_HEADER_SIZE - RDT_FRAME_TAIL_SIZE;
// 检查文件是否打开成功
if (!f.open()) {
fprintf(stderr, "File %s open failed\n", fileName.c_str());
return -1;
}
char* buffer = new char[RDT_FRAME_BUFFER_SIZE]();
if (!buffer) {
ret = -ENOMEM;
goto LAB_SEND_RETURN_ERROR;
}
while (true) {
// 读取payload:用ssize_t区分成功/失败
ssize_t readSize = f.read(buffer + RDT_PAYLOAD_WRITE_POSTION, buffer_for_payload_size);
if (readSize < 0) { // 读取错误
fprintf(stderr, "File %s read failed, err=%zd\n", fileName.c_str(), readSize);
delete[] buffer;
ret = -2;
goto LAB_SEND_RETURN_ERROR;
} else if (readSize == 0) { // 读取到文件尾
delete[] buffer;
break;
}
// 判断是否为最后一包
if (readSize < buffer_for_payload_size) {
if (isLastFile) {
rdt_endflag = true;
}
}
// 构建header和tail
createPacketHeader(buffer, RDT_FRAME_BUFFER_SIZE, fileName.c_str(), rdt_file_size, static_cast(readSize), rdt_endflag);
createPacketTail(buffer + RDT_FRAME_HEADER_SIZE + readSize, RDT_FRAME_TAIL_SIZE);
//发送完整包长度(header + payload + tail)
size_t send_len = RDT_FRAME_HEADER_SIZE + readSize + RDT_FRAME_TAIL_SIZE;
ret = RDT_Write(rdt_id, buffer, send_len);
if (ret < 0) {
fprintf(stderr, "RDT_Write failed, err=%d\n", ret);
delete[] buffer;
goto LAB_SEND_RETURN_ERROR;
}
// 最后一包:刷新并退出
if (rdt_endflag) {
RDT_Flush(rdt_id);
break;
}
}
if(buffer) delete[] buffer; // 释放缓冲区
buffer = NULL;
f.close();
return 0; // 成功返回0
LAB_SEND_RETURN_ERROR:
if(buffer) delete[] buffer; // 释放缓冲区:
f.close(); // 确保文件关闭,避免泄漏
return ret;
}
发送文件列表
void sendFileList(void* arg, int sid, int iotc_channel_id) {
int rdt_id = createChannelForDownload(sid, iotc_channel_id);
if (rdt_id < 0) {
fprintf(stderr, "Create channel failed, rdt_id=%d\n", rdt_id);
return;
}
int ret = 0;
FileList* fileList = static_cast(arg);
int fileCount = fileList->count();
int fileIndex = 1;
//遍历文件,发送文件列表
for (auto& file : fileList->getFiles()) {
bool isLastFile = (fileCount == fileIndex);
ret = sendOneFile2Client(rdt_id, file, isLastFile);
if (ret < 0) {
fprintf(stderr, "Send file %s error %d\n", file.name().c_str(), ret);
break;
}
fileIndex++;
}
// 检查发送队列是否清空
if (ret == 0) {
do {
st_RDT_Status status;
ret = RDT_Status_Check(rdt_id, &status);
if (ret < 0) {
fprintf(stderr, "RDT_Status_Check failed, err=%d\n", ret);
break;
}
if (status.BufSizeInSendQueue == 0) {
break;
} else {
usleep(100 * 1000); // 替代msleep,跨平台
}
} while (true);
}
// 关闭通道
destroyChannelOfDownload(rdt_id);
}
APP端
接收端也需要按照对应的格式进行切包和解析数据,如果切包有误,则可能导致数据解析异常。
读取通道数据和解析方法(伪)代码:
读取通道数据和解析方法(伪)代码:
//读数据头
#define HEADER_SIZE 77
#define RDT_BUFFER_SIZE 20480
int createChannelForDownload(int sid, int iotc_channel_id);
void destroyChannelOfDownload(int rdt_id);
// 解析RDT Header
bool parseRDTHeader(const char* headerBuffer, std::string& fileName, int& dataLength, bool& endFlag) {
// 示例:从headerBuffer解析fileName、dataLength、endFlag
fileName = "example.txt";
dataLength = 1024;
endFlag = false;
return true;
}
// 读取完整的RDT Header
int readRDTHeader(int rdt_id, char* headerBuffer, int bufferSize) {
if (rdt_id < 0 || !headerBuffer || bufferSize < HEADER_SIZE) {
fprintf(stderr, "readRDTHeader: invalid params\n");
return -1;
}
memset(headerBuffer, 0, HEADER_SIZE); // 仅清空Header部分,避免越界
int restSize = HEADER_SIZE;
int headerIndex = 0;
int retryCount = 0; // 超时重试计数
do {
//传入rdt_id参数
int size = RDT_Read(rdt_id, headerBuffer + headerIndex, restSize, 1000);
if (size > 0) {
headerIndex += size;
restSize -= size;
retryCount = 0; // 成功读取,重置重试计数
} else if (size == RDT_ER_TIMEOUT) {
retryCount++;
if (retryCount >= MAX_READ_RETRY) { // 超过最大重试次数,退出
fprintf(stderr, "readRDTHeader: timeout after %d retries\n", MAX_READ_RETRY);
return RDT_ER_TIMEOUT;
}
usleep(10 * 1000); // 超时后短暂休眠,避免频繁重试
} else { // 其他错误(如通道关闭)
fprintf(stderr, "readRDTHeader: RDT_Read error %d\n", size);
return size;
}
} while (restSize > 0);
return 0;
}
读取二进制数据并保存到文件
int readBinaryDataAndSaveOneFrame(file& f, int rdt_id,char* dataBuffer, int bufferSize,int dataLength) {
// bufferSize需大于0,dataLength需大于0
if (rdt_id < 0 || !dataBuffer || bufferSize <= 0 || dataLength <= readbinarydataandsaveoneframe:="" invalid="" return="" if="" file="" not="" int="" restdatasize="" dataindex="0;" retrycount="0;" while=""> 0) {
// 动态调整读取大小:取bufferSize和剩余数据的较小值
int readSize = (bufferSize < restDataSize) ? bufferSize : restDataSize;
int size = RDT_Read(rdt_id, dataBuffer + dataIndex, readSize, 1000);
if (size > 0) {
// 写入文件:仅写入实际读取的size字节
int writeRet = f.write(dataBuffer + dataIndex, size);
if (writeRet < 0) {
fprintf(stderr, "readBinaryDataAndSaveOneFrame: file write error\n");
return -3;
}
restDataSize -= size;
dataIndex += size;
retryCount = 0;
} else if (size == RDT_ER_TIMEOUT) {
retryCount++;
if (retryCount >= MAX_READ_RETRY) {
fprintf(stderr, "readBinaryDataAndSaveOneFrame: timeout\n");
return RDT_ER_TIMEOUT;
}
usleep(100 * 1000);
} else { // 其他错误
fprintf(stderr, "readBinaryDataAndSaveOneFrame: RDT_Read error %d\n", size);
return size;
}
}
return 0;
}
读取尾标识
int readTail(int rdt_id) {
const int TAIL_SIZE = 2;
char buffer[TAIL_SIZE] = {0};
int restSize = TAIL_SIZE;
int bufferIndex = 0; // 初始化偏移量
int retryCount = 0;
do {
int size = RDT_Read(rdt_id, buffer + bufferIndex, restSize, 1000);
if (size > 0) {
bufferIndex += size;
restSize -= size;
retryCount = 0;
} else if (size == RDT_ER_TIMEOUT) {
retryCount++;
if (retryCount >= MAX_READ_RETRY) {
fprintf(stderr, "readTail: timeout\n");
return RDT_ER_TIMEOUT;
}
usleep(100 * 1000);
} else {
fprintf(stderr, "readTail: RDT_Read error %d\n", size);
return size;
}
} while (restSize > 0);
// 校验尾标识
if (buffer[0] != 'G' || buffer[1] != 'C') {
fprintf(stderr, "readTail: invalid tail (expected GC, got %c%c)\n", buffer[0], buffer[1]);
return -1;
}
return 0;
}
接收文件主逻辑
int recvFilesFromChannel(int rdt_id) {
if (rdt_id < 0) {
fprintf(stderr, "recvFilesFromChannel: invalid rdt_id\n");
return -1;
}
bool endFlag = false;
int ret = 0;
file f;
std::string fileName, lastFileName;
char headerBuffer[HEADER_SIZE] = {0};
char dataBuffer[RDT_BUFFER_SIZE] = {0};
while (!endFlag) {
// 1. 读取Header
ret = readRDTHeader(rdt_id, headerBuffer, sizeof(headerBuffer));
if (ret < 0) {
fprintf(stderr, "recvFilesFromChannel: read header failed %d\n", ret);
break;
}
// 2. 解析Header
int dataLength = 0;
if (!parseRDTHeader(headerBuffer, fileName, dataLength, endFlag)) {
fprintf(stderr, "recvFilesFromChannel: parse header failed\n");
ret = -2;
break;
}
// 3. 处理文件打开/切换
if (!f.isOpen()) {
if (!f.open(fileName)) { // 检查文件打开结果
fprintf(stderr, "recvFilesFromChannel: open file %s failed\n", fileName.c_str());
ret = -3;
break;
}
lastFileName = fileName;
fprintf(stdout, "recvFilesFromChannel: open file %s\n", fileName.c_str());
} else {
if (fileName != lastFileName) {
f.close(); // 关闭旧文件
if (!f.open(fileName)) {
fprintf(stderr, "recvFilesFromChannel: open new file %s failed\n", fileName.c_str());
ret = -3;
break;
}
lastFileName = fileName;
fprintf(stdout, "recvFilesFromChannel: switch to file %s\n", fileName.c_str());
}
}
// 4. 读取二进制数据并保存
ret = readBinaryDataAndSaveOneFrame(f, rdt_id, dataBuffer, RDT_BUFFER_SIZE, dataLength);
if (ret < 0) {
fprintf(stderr, "recvFilesFromChannel: read data failed %d\n", ret);
break;
}
// 5. 读取并校验尾标识
ret = readTail(rdt_id);
if (ret < 0) {
fprintf(stderr, "recvFilesFromChannel: read tail failed %d\n", ret);
break;
}
}
// 确保文件关闭,避免资源泄漏
if (f.isOpen()) {
f.close();
fprintf(stdout, "recvFilesFromChannel: close file %s\n", lastFileName.c_str());
}
return ret;
}
接收线程
struct ChannelParams { // 定义参数结构体,传递外部数据
int sid;
int iotc_channel_id;
};
void createRDTChannelAndSaveFiles(void* arg) {
if (!arg) {
fprintf(stderr, "createRDTChannelAndSaveFiles: arg is null\n");
return;
}
ChannelParams* params = static_cast(arg);
int rdt_id = createChannelForDownload(params->sid, params->iotc_channel_id);
if (rdt_id < 0) {
fprintf(stderr, "createRDTChannelAndSaveFiles: create channel failed %d\n", rdt_id);
return;
}
// 接收数据并处理错误
int ret = recvFilesFromChannel(rdt_id);
if (ret < 0) {
fprintf(stderr, "createRDTChannelAndSaveFiles: recv files failed %d\n", ret);
} else {
fprintf(stdout, "createRDTChannelAndSaveFiles: recv files success\n");
}
destroyChannelOfDownload(rdt_id);
}
四、结束标志的判断
在文件传输过程中,结束标志的判断非常重要。根据前面的代码实现,结束标志的处理主要体现在以下几个方面:
- 包尾标识:每个数据包的结尾都有"GC"标识,用于确认数据包的完整性
- 文件结束标志:当传输最后一个文件时,会设置endflag为1
- 缓存区检查:发送完成后,通过RDT_Status_Check检查发送缓存区是否为空
- 读取大小判断:当读取的数据大小小于缓冲区大小时,认为是文件的最后一包
这些机制共同确保了文件传输的可靠性和完整性,避免了数据丢失或传输不完整的问题。
