實現 UDP 可靠性傳輸(KCP 介紹使用)

1、TCP 協議介紹

TCP 協議是基於 IP 協議,面向連接,可靠基於字節流的傳輸層協議 1、基於 IP 協議:TCP 協議是基於 IP 協議之上傳輸的,TCP 協議報文中的源端口 + IP 協議報文中的源地址 + TCP 協議報文中的目標端口 + IP 協議報文中的目標地址,組合起來唯一確定一條 TCP 連接。 2、面向連接:與 UDP 不同,TCP 在傳輸數據之前,需要進行三次握手,建立一條 TCP 連接,然後在進行數據傳輸,釋放需要進行四次揮手。 3、基於字節流:流的含義是不間斷的數據結構,這裏指的是沒有邊界的報文結構,假如發送內容比較大,TCP 協議棧會將數據切成一塊一塊放入內核中。

在這裏插入圖片描述

1.1、ARQ 協議

TCP 之所以能實現可靠的數據傳輸,正是因爲基於 ARQ 協議,ARQ 協議 (Automatic Repeat-reQuest),即自動重傳請求,是傳輸層的糾正協議,在不可靠的網絡中實現可靠的信息傳輸。 ARQ 主要有 3 種模式: 1、停等式 2、回退 n 幀 3、選擇性重傳

1.2、停等式

停等式協議工作原理如下: 1、發送方對接收方發送數據包,等待接收方回覆 ack,並且開始計時 2、在等待過程中發送方停止發送數據 3、當數據包沒有成功被接收方接收,接收方是不會發生 ack,等待一段時間後,發送方會重新發送數據包 4、反覆這個過程直到接收到 ack 缺點:較長的等待時間,使發送數據緩慢。

1.3、回退 n 幀

爲了解決上面的長時間等待 ack 的缺陷,連續 ARQ 協議會,連續發送一組數據包,然後會等待這些數據包的 ack。

什麼是滑動窗口?

發送方和接收方都會維護一個數據幀序列,這個序列被稱爲窗口,發送方的窗口是由接受方確定的,目的是控制發送方的速度,避免接收方的緩存不夠,而導致數據溢出,同時限制網絡中的流量,避免網絡阻塞,協議中規定,對於窗口內未經確定的分組進行重傳。

回退 n 幀

回退 n 幀允許發送方在等待超時的間歇,可以繼續發送分組,所有分組攜帶序列號,在 GBN 協議中,發送方需要響應以下三件事件: 1、上層的調用,上層調用相應 send() 時,發送方首先要檢索發送窗口是否滿 2、接收 ack,在該協議中,對序號 n 的分組的確定採取累積確認的方式,表明接收方已正確接收 n 以前的的所有分組 3、超時,若出現超時,發送方將重傳所有已發生但未被確定的分組 下圖:序號爲 2 的分組,丟失了,後面的所有分組都需要重新傳 GBN 採用的技術包括序號、累積確認、檢驗和以及計時 / 重傳。

1.4、選擇性重傳

雖然 GBN 改善了停等式中等待時間過長的缺陷,但是依舊存在性能問題,而 SR 協議通過讓發送方僅重傳在接收時丟失的分組,從而避免不必要的重傳。

發送方:

SR 協議中發送方需要響應以下三件事: 1、從上層接收數據,當從上層接收數據後,發送方需檢查下一個可用於該分組的序號,若序號在窗口中則發送數據。 2、接收 ACK。若收到 ACK,且該分組在窗口內,則發送方將那個被確認的分組標記爲已接收。若該分組序號等於基序號,則窗口序號向前移動到具有最小序號的未確認分組處。若窗口移動後並且有序號落在窗口內的未發送分組,則發送這些分組。 3、超時。若出現超時,發送方將重傳已發出但還未確認的分組。與 GBN 不同的是, SR 協議中的每個分組都有獨立的計時器。

接收方:

在 SR 協議下,接收方需響應以下三種事件: (假設接收窗口的基序號爲 4,分組長度也爲 4) 1、序號在 [4,7] 內的分組被正確接收。該情況下,收到的分組落在接收方的窗口內,一個 ACK 將發送給發送方。若該分組是以前沒收到的分組,則被緩存。若該分組的序號等於基序號 4,則該分組以及以前緩存的序號連續的分組都交付給上層,然後,接收窗口將向前移動。 2、序號在 [0,3] 內的分組被正確接收。在該情況下,必須產生一個 ACK,儘管該分組是接收方 以前已確認過的分組。若接收方不確認該分組,發送方窗口將不能向前移動。 3、其他情況。忽略該分組對於接收方來說,若一個分組正確接收而不管其是否按序,則接收方會爲該分組返回一個 ACK 給發送方。失序的分組將被緩存,直到所有丟失的分組都被收到,這時纔可以將一批分組按序交付給上層。

1.5、RTT 和 RTO

RTT

RTT 是指數據包從發送端發送出去到接收端收到併發送確認回來所經過的時間。它表示了數據包在網絡中傳輸的延遲,通常以毫秒(ms)爲單位。RTT 的測量通常通過發送方發送一個數據包,然後在接收到對應的確認回覆時計算出來。

RTO

RTO 是指在發送方發送數據包後,等待確認回覆的超時時間。當發送方發送一個數據包後,它會啓動一個定時器,如果在 RTO 時間內未收到對應的確認回覆,發送方會認爲數據包已丟失或損壞,並觸發重傳機制。RTO 的值通常是根據過去的 RTT 值來動態調整的,以適應網絡的變化。發送方會維護一個估計的往返時間(Estimated Round-Trip Time,ERTT),並根據 ERTT 計算出 RTO 的值。常見的算法是基於加權平均值,如 Karn 算法或 Jacobson/Karels 算法

1.6、流量控制

接收方

接收方每次收到數據包,可以在發送確定報文的時候,同時告訴發送方自己的緩存區還剩餘多少是空閒的,我們也把緩存區的剩餘大小稱之爲接收窗口大小,用變量 win 來表示接收窗口的大小。

發送方

發送方收到之後,便會調整自己的發送速率,也就是調整自己發送窗口的大小,當發送方收到接收窗口的大小爲 0 時,發送方就會停止發送數據,防止出現大量丟包情況的發生。

流量控制 - 發送方何時再繼續發送數據?

當發送方停止發送數據後,該怎樣才能知道自己可以繼續發送數據? 1、當接收方處理好數據,接受窗口 win > 0 時,接收方發個通知報文去通知發送方,告訴他可以繼續發送數據了。當發送方收到窗口大於 0 的報文時,就繼續發送數據。 2、當發送方收到接受窗口 win = 0 時,這時發送方停止發送報文,並且同時開啓一個定時器,每隔一段時間就發個測試報文去詢問接收方,打聽是否可以繼續發送數據了,如果可以,接收方就告訴他此時接受窗口的大小;如果接受窗口大小還是爲 0,則發送方再次刷新啓動定時器。

流量控制 - 小結

  1. 1. 通信的雙方都擁有兩個滑動窗口,一個用於接受數據,稱之爲接收窗口;一個用於發送數據,稱之爲擁塞窗口 (即發送窗口)。指出接受窗口大小的通知我們稱之爲窗口通告。 2、接收窗口的大小固定嗎?接受窗口的大小是根據某種算法動態調整的。 3、接收窗口越大越好嗎?當接收窗口達到某個值的時候,再增大的話也不怎麼會減少丟包率的了,而且還會更加消耗內存。所以接收窗口的大小必須根據網絡環境以及發送發的的擁塞窗口來動態調整。 4、發送窗口和接受窗口相等嗎?接收方在發送確認報文的時候,會告訴發送發自己的接收窗口大小,而發送方的發送窗口會據此來設置自己的發送窗口,但這並不意味着他們就會相等。首先接收方把確認報文發出去的那一刻,就已經在一邊處理堆在自己緩存區的數據了,所以一般情況下接收窗口 >= 發送窗口。

1.7、擁塞控制

大家可能都聽說過擁塞控制和流量控制,想必也有一些人可能還分不清擁塞控制和流量控制,進而把他們當作一回事。擁塞控制和流量控制雖然採取的動作很相似,但擁塞控制與網絡的擁堵情況相關聯,而流量控制與接收方的緩存狀態相關聯。也就是說,擁塞控制和流量控制是針對完全不同的問題而採取的措施。今天這篇文章,我們先來講講擁塞控制。

鏈接: 5 分鐘讀懂擁塞控制

2、KCP

2.1、KCP 介紹

KCP 是一種網絡傳輸協議 (A Fast and Reliable ARQ Protocol),可以視它爲 TCP 的代替品,但是它運行於用戶空間,它不管底層的發送與接收,只是個純算法實現可靠傳輸,它的特點是犧牲帶寬來降低延遲。因爲 TCP 協議的大公無私,經常犧牲自己速度來減少網絡擁塞,它是從大局上考慮的。而 KCP 是自私的,它只顧自己的傳輸效率,從不管整個網絡的擁塞情況。舉個例子,TCP 檢測到丟包的時候,首先想到的是網絡擁塞了,要放慢自己的速度別讓網絡更糟,而 KCP 想到的趕緊重傳別耽誤事。

TCP

TCP 的特點是可靠傳輸 (累積確認、超時重傳、選擇確認)、流量控制 (滑動窗口)、擁塞控制 (慢開始、擁塞避免、快重傳、快恢復)、面向連接。KCP 對這些參數基本都可配,也沒用建立 / 關閉連接的過程

KCP

其實 KCP 並不神祕,因爲 TCP 的高度自治 (很多東西都不可配),滿足不了如今各種速度需求。而 KCP 就是基於 UDP 協議,再將一些 TCP 經典的機制移植過來,變成參數可配。

2.2、TCP vs KCP

以 10%-20% 帶寬浪費的代價換取了比 TCP 快 30%-40% 的傳輸速度 1、RTO 翻倍 vs 不翻倍

TCP 超時計算是 RTOx2,這樣連續丟三次包就變成 RTOx8 了,十分恐怖,而 KCP 啓 動快速模式後不 x2,只是 x1.5(實驗證明 1.5 這個值相對比較好),提高了傳輸速度。 200 300 450 675 – 200 400 800 1600

2、選擇性重傳 vs 全部重傳

TCP 丟包時會全部重傳從丟的那個包開始以後的數據,KCP 是選擇性重傳,只重傳真正丟失的數據包。

3、快速重傳(跳過多少個包馬上重傳)(如果使用了快速重傳,可以不考慮 RTO))

發送端發送了 1,2,3,4,5 幾個包,然後收到遠端的 ACK: 1, 3, 4, 5,當收到 ACK3 時,KCP 知道 2 被跳過 1 次,收到 ACK4 時,知道 2 被跳過了 2 次,此時可以認爲 2 號丟失,不用等超時,直接重傳 2 號包,大大改善了丟包時的傳輸速度。 fastresend=2

4、延遲 ACK vs 非延遲 ACK

TCP 爲了充分利用帶寬,延遲發送 ACK(NODELAY 都沒用),這樣超時計算會算出較大 RTT 時間,延長了丟包時的判斷過程。KCP 的 ACK 是否延遲發送可以調節。

5、UNA vs ACK+UNA

ARQ 模型響應有兩種,UNA(此編號前所有包已收到,如 TCP)和 ACK(該編號包已收到),光用 UNA 將導致全部重傳,光用 ACK 則丟失成本太高,以往協議都是二選其一,而 KCP 協議中,除去單獨的 ACK 包外,所有包都有 UNA 信息。

6、非退讓流控

KCP 正常模式同 TCP 一樣使用公平退讓法則,即發送窗口大小由:發送緩存大小、接收端剩餘接收緩存大小、丟包退讓及慢啓動這四要素決定。但傳送及時性要求很高的小數據時,可選擇通過配置跳過後兩步,僅用前兩項來控制發送頻率。以犧牲部分公平性及帶寬利用率之代價,換取了開着 BT 都能流暢傳輸的效果。

2.3、KCP 使用

1、創建 KCP 對象

// 初始化 kcp 對象,conv 爲一個表示會話編號的整數,和 tcp 的 conv 一樣,通信雙 // 方需保證 conv 相同,相互的數據包才能夠被認可,user 是一個給回調函數的指針 ikcpcb *kcp = ikcp_create(conv, user);

2、設置回調函數

// KCP 的下層協議輸出函數,KCP 需要發送數據時會調用它 // buf/len 表示緩存和長度 // user 指針爲 kcp 對象創建時傳入的值,用於區別多個 KCP 對象 int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) { .... } // 設置回調函數 kcp->output = udp_output;

3、循環調用 update

// 以一定頻率調用 ikcp_update 來更新 kcp 狀態,並且傳入當前時鐘(毫秒單位) // 如 10ms 調用一次,或用 ikcp_check 確定下次調用 update 的時間不必每次調用 ikcp_update(kcp, millisec);

4、輸入一個下層數據包

// 收到一個下層數據包(比如 UDP 包)時需要調用: ikcp_input(kcp, received_udp_packet, received_udp_size); 處理了下層協議的輸出 / 輸入後 KCP 協議就可以正常工作了,使用 ikcp_send 來向 遠端發送數據。而另一端使用 ikcp_recv(kcp, ptr, size) 來接收數據。

協議配置 協議默認模式是一個標準的 ARQ,需要通過配置打開各項加速開關: 1、工作模式:

int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc) nodelay :是否啓用 nodelay 模式,0 不啓用;1 啓用。 interval :協議內部工作的 interval,單位毫秒,比如 10ms 或者 20ms resend :快速重傳模式,默認 0 關閉,可以設置 2(2 次 ACK 跨越將會直接重傳) nc :是否關閉流控,默認是 0 代表不關閉,1 代表關閉。 普通模式: ikcp_nodelay(kcp, 0, 40, 0, 0); 極速模式: ikcp_nodelay(kcp, 1, 10, 2, 1);

2、最大窗口:

int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd); 該調用將會設置協議的最大發送窗口和最大接收窗口大小,默認爲 32. 這個可以理解爲 TCP 的 SND_BUF 和 RCV_BUF,只不過單位不一樣 SND/RCV_BUF 單位是字節,這個單位是包。

3、最大傳輸單元:

純算法協議並不負責探測 MTU,默認 mtu 是 1400 字節,可以使用 ikcp_setmtu 來設置該值。該值將會影響數據包歸併及分片時候的最大傳輸單元。

4、最小 RTO:

不管是 TCP 還是 KCP 計算 RTO 時都有最小 RTO 的限制,即便計算出來 RTO 爲 40ms,由於默認的 RTO 是 100ms,協議只有在 100ms 後才能檢測到丟包,快速模式下爲 30ms,可以手動更改該值: kcp->rx_minrto = 10;

2.4、client 端

delay.h

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>



#define DELAY_BODY_SIZE 1300
typedefstructdelay_obj
{
uint16_t seqno;// 序列號
int64_t send_time;// 發送時間
int64_t recv_time;// 回來時間
uint8_t body[DELAY_BODY_SIZE];
}t_delay_obj;

int64_t iclock64();
uint32_t iclock();
t_delay_obj *delay_new();
void delay_set_seqno(t_delay_obj *obj, uint16_t seqno);
void delay_set_seqno_send_time(t_delay_obj *obj, uint16_t seqno);
void delay_set_send_time(t_delay_obj *obj);
void delay_set_recv_time(t_delay_obj *obj);
void delay_print_rtt_time(t_delay_obj *objs, int num);

delay.c

#include "delay.h"
#include <sys/types.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>

#include <sys/time.h>
#include <sys/wait.h>

/* get system time */
void itimeofday(long *sec, long *usec)
{
#if defined(__unix)
structtimeval time;
gettimeofday(&time,NULL);
if(sec)*sec = time.tv_sec;
if(usec)*usec = time.tv_usec;
#else
staticlong mode =0, addsec =0;
int retval;
staticint64_t freq =1;
int64_t qpc;
if(mode ==0){
        retval =QueryPerformanceFrequency((LARGE_INTEGER*)&freq);
        freq =(freq ==0)?1: freq;
        retval =QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
        addsec =(long)time(NULL);
        addsec = addsec -(long)((qpc / freq)&0x7fffffff);
        mode =1;
}
    retval =QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
    retval = retval *2;
if(sec)*sec =(long)(qpc / freq)+ addsec;
if(usec)*usec =(long)((qpc % freq)*1000000/ freq);
#endif
}

/* get clock in millisecond 64 */
int64_t iclock64(void)
{
long s, u;
int64_t value;
itimeofday(&s,&u);
    value =((int64_t)s)*1000+(u /1000);
return value;
}

uint32_t iclock()
{
return(uint32_t)(iclock64()&0xfffffffful);
}


inline t_delay_obj *delay_new() {
    t_delay_obj *obj =(t_delay_obj *)malloc(sizeof(t_delay_obj));
if(!obj){
returnNULL;
}
    obj->seqno =0;
    obj->send_time =0;
    obj->recv_time =0;
}

inline void delay_set_seqno(t_delay_obj *obj, uint16_t seqno) {
    obj->seqno = seqno;
}

inline void delay_set_seqno_send_time(t_delay_obj *obj, uint16_t seqno) {
    obj->seqno = seqno;
    obj->send_time =iclock64();
}

inline void delay_set_send_time(t_delay_obj *obj) {
    obj->send_time =iclock64();
}

inline void delay_set_recv_time(t_delay_obj *obj) {
    obj->recv_time =iclock64();
}

inline void delay_print_rtt_time(t_delay_obj *objs, int num) {
for(int i =0; i < num; i++){
        t_delay_obj *obj =&(objs[i]);
printf("%04d seqno:%d rtt  :%ldms\n", i, obj->seqno, obj->recv_time - obj->send_time);
// printf("%04d seqno:%d snd_t:%ldms\n", i, obj->seqno, obj->send_time);
// printf("%04d seqno:%d rcv_t:%ldms\n", i, obj->seqno, obj->recv_time);
}
}

client.c

#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include "ikcp.h"

#include <sys/time.h>
#include <sys/wait.h>
#include <arpa/inet.h>


#include "delay.h"
#define DELAY_TEST2_N 1
#define UDP_RECV_BUF_SIZE 1500

// 編譯:  gcc -o client client.c ikcp.c delay.c  -lpthread


typedefstruct{
unsignedchar*ipstr;
int port;

    ikcpcb *pkcp;

int sockfd;
structsockaddr_in addr;//存放服務器的結構體

char buff[UDP_RECV_BUF_SIZE];//存放收發的消息
}kcpObj;


/* sleep in millisecond */
void isleep(unsigned long millisecond)
{
#ifdef __unix  /* usleep( time * 1000 ); */
structtimespec ts;
    ts.tv_sec =(time_t)(millisecond /1000);
    ts.tv_nsec =(long)((millisecond %1000)*1000000);
/*nanosleep(&ts, NULL);*/
usleep((millisecond <<10)-(millisecond <<4)-(millisecond <<3));
#elif defined(_WIN32)
Sleep(millisecond);
#endif
}



int udp_output(const char *buf, int len, ikcpcb *kcp, void *user){

//  printf("使用udp_output發送數據\n");

    kcpObj *send =(kcpObj *)user;

//發送信息
int n =sendto(send->sockfd, buf, len,0,(struct sockaddr *)&send->addr,sizeof(struct sockaddr_in));//【】
if(n >=0)
{
//會重複發送,因此犧牲帶寬
printf("send:%d bytes\n", n);//24字節的KCP頭部
return n;
}
else
{
printf("udp_output: %d bytes send, error\n", n);
return-1;
}
}


int init(kcpObj *send)
{
    send->sockfd =socket(AF_INET,SOCK_DGRAM,0);

if(send->sockfd <0)
{
perror("socket error!");
exit(1);
}

bzero(&send->addr,sizeof(send->addr));

//設置服務器ip、port
    send->addr.sin_family=AF_INET;
    send->addr.sin_addr.s_addr =inet_addr((char*)send->ipstr);
    send->addr.sin_port =htons(send->port);

printf("sockfd = %d ip = %s  port = %d\n",send->sockfd,send->ipstr,send->port);

}

// 特別說明,當我們使用kcp測試rtt的時候,如果發現rtt過大,很大一種可能是分片數據沒有及時發送出去,需要調用ikcp_flush更快速將分片發送出去。
void delay_test2(kcpObj *send) {
// 初始化 100個 delay obj
char buf[UDP_RECV_BUF_SIZE];
unsignedint len =sizeof(struct sockaddr_in);

size_t obj_size =sizeof(t_delay_obj);
    t_delay_obj *objs =malloc(DELAY_TEST2_N *sizeof(t_delay_obj));
int ret =0;

int recv_objs =0;
//ikcp_update包含ikcp_flush,ikcp_flush將發送隊列中的數據通過下層協議UDP進行發送
ikcp_update(send->pkcp,iclock());//不是調用一次兩次就起作用,要loop調用
for(int i =0; i < DELAY_TEST2_N; i++){
//  isleep(1);
delay_set_seqno_send_time(&objs[i], i);
        ret =ikcp_send(send->pkcp,(char*)&objs[i], obj_size);
if(ret <0){
printf("send %d seqno:%u failed, ret:%d, obj_size:%ld\n", i, objs[i].seqno, ret, obj_size);
return;
}
// ikcp_flush(send->pkcp);  // 調用flush能更快速把分片發送出去
//ikcp_update包含ikcp_flush,ikcp_flush將發送隊列中的數據通過下層協議UDP進行發送
ikcp_update(send->pkcp,iclock());//不是調用一次兩次就起作用,要loop調用

int n =recvfrom(send->sockfd, buf, UDP_RECV_BUF_SIZE, MSG_DONTWAIT,(struct sockaddr *)&send->addr,&len);
// printf("print recv1:%d\n", n);
if(n <0){//檢測是否有UDP數據包 
// isleep(1);
continue;
}
        ret =ikcp_input(send->pkcp, buf, n);// 從 linux api recvfrom先扔到kcp引擎
if(ret <0)//檢測ikcp_input是否提取到真正的數據
{
//printf("ikcp_input ret = %d\n",ret);
continue;// 沒有讀取到數據
}
        ret =ikcp_recv(send->pkcp,(char*)&objs[i], obj_size);
if(ret <0)//檢測ikcp_recv提取到的數據 
{
printf("ikcp_recv1 ret = %d\n",ret);
continue;
}
delay_set_recv_time(&objs[recv_objs]);
        recv_objs++;
printf("recv1 %d seqno:%d, ret:%d\n", recv_objs, objs[i].seqno, ret);
if(ret != obj_size){
printf("recv1 %d seqno:%d failed, size:%d\n", i, objs[i].seqno, ret);
delay_print_rtt_time(objs, i);
return;
}
}

// 還有沒有發送完畢的數據
for(int i = recv_objs; i < DELAY_TEST2_N;){
//  isleep(1);
//ikcp_update包含ikcp_flush,ikcp_flush將發送隊列中的數據通過下層協議UDP進行發送
ikcp_update(send->pkcp,iclock());//不是調用一次兩次就起作用,要loop調用
//ikcp_flush(send->pkcp);  // 調用flush能更快速把分片發送出去  
int n =recvfrom(send->sockfd, buf, UDP_RECV_BUF_SIZE, MSG_DONTWAIT,(struct sockaddr *)&send->addr,&len);
// printf("recv2:%d\n", n);
if(n <0){//檢測是否有UDP數據包
// printf("recv2:%d\n", n);
isleep(1);
continue;
}

        ret =ikcp_input(send->pkcp, buf, n);
if(ret <0)//檢測ikcp_input是否提取到真正的數據
{
printf("ikcp_input2 ret = %d\n",ret);
continue;// 沒有讀取到數據
}
        ret =ikcp_recv(send->pkcp,(char*)&objs[i], obj_size);
if(ret <0)//檢測ikcp_recv提取到的數據 
{
// printf("ikcp_recv2 ret = %d\n",ret);
continue;
}
printf("recv2 %d seqno:%d, ret:%d\n", recv_objs,  objs[i].seqno, ret);
delay_set_recv_time(&objs[recv_objs]);
        recv_objs++;
        i++;
if(ret != obj_size){
printf("recv2 %d seqno:%d failed, size:%d\n", i, objs[i].seqno, ret);
delay_print_rtt_time(objs, i);
return;
}

}
ikcp_flush(send->pkcp);

delay_print_rtt_time(objs, DELAY_TEST2_N);
}

void loop(kcpObj *send)
{
unsignedint len =sizeof(struct sockaddr_in);
int n,ret;

// while(1)
{
isleep(1);
delay_test2(send);
}
printf("loop finish\n");
close(send->sockfd);

}

int main(int argc,char *argv[])
{
//printf("this is kcpClient,請輸入服務器 ip地址和端口號:\n");
if(argc !=3)
{
printf("請輸入服務器ip地址和端口號\n");
return-1;
}
printf("this is kcpClient\n");
int64_t cur =iclock64();
printf("main started t:%ld\n", cur);// prints Hello World!!!
unsignedchar*ipstr =(unsignedchar*)argv[1];
unsignedchar*port  =(unsignedchar*)argv[2];

    kcpObj send;
    send.ipstr = ipstr;
    send.port =atoi(argv[2]);

init(&send);//初始化send,主要是設置與服務器通信的套接字對象

bzero(send.buff,sizeof(send.buff));

// 每個連接都是需要對應一個ikcpcb
    ikcpcb *kcp =ikcp_create(0x1,(void*)&send);//創建kcp對象把send傳給kcp的user變量
    kcp->output = udp_output;//設置kcp對象的回調函數
ikcp_nodelay(kcp,0,10,0,0);//(kcp1, 0, 10, 0, 0); 1, 10, 2, 1
ikcp_wndsize(kcp,128,128);
ikcp_setmtu(kcp,1400);
    send.pkcp = kcp;
loop(&send);//循環處理
ikcp_release(send.pkcp);
printf("main finish t:%ldms\n",iclock64()- cur);
return0;
}

2.5、server 端

#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include "ikcp.h"
#define RECV_BUF 1500

staticint number =0;

typedefstruct
{
unsignedchar*ipstr;
int port;

    ikcpcb *pkcp;

int sockfd;

structsockaddr_in addr;//存放服務器信息的結構體
structsockaddr_inCientAddr;//存放客戶機信息的結構體

char buff[RECV_BUF];//存放收發的消息

} kcpObj;
// 編譯:  gcc -o server server.c ikcp.c  
// 特別需要注意,這裏的服務器端也只能一次使用,即是等客戶端退出後,服務端也要停止掉再啓動
// 之所以是這樣,主要是因爲sn的問題,比如客戶端第一次啓動 sn 0~5, 第二次啓動發送的sn還是0 ~5 如果服務器端不停止則自己以爲0~5已經收到過了就不會回覆。

// 在真正使用的時候,還需要另外的通道讓客戶端和服務器端之前重新創建ikcpcb,以匹配ikcpcb的conv
/* get system time */
void itimeofday(long *sec, long *usec)
{
#if defined(__unix)
structtimeval time;
gettimeofday(&time,NULL);
if(sec)
*sec = time.tv_sec;
if(usec)
*usec = time.tv_usec;
#else
staticlong mode =0, addsec =0;
    BOOL retval;
static IINT64 freq =1;
    IINT64 qpc;
if(mode ==0)
{
        retval =QueryPerformanceFrequency((LARGE_INTEGER *)&freq);
        freq =(freq ==0)?1: freq;
        retval =QueryPerformanceCounter((LARGE_INTEGER *)&qpc);
        addsec =(long)time(NULL);
        addsec = addsec -(long)((qpc / freq)&0x7fffffff);
        mode =1;
}
    retval =QueryPerformanceCounter((LARGE_INTEGER *)&qpc);
    retval = retval *2;
if(sec)
*sec =(long)(qpc / freq)+ addsec;
if(usec)
*usec =(long)((qpc % freq)*1000000/ freq);
#endif
}

/* get clock in millisecond 64 */
IINT64 iclock64(void)
{
long s, u;
    IINT64 value;
itimeofday(&s,&u);
    value =((IINT64)s)*1000+(u /1000);
return value;
}

IUINT32 iclock()
{
return(IUINT32)(iclock64()&0xfffffffful);
}

int64_t first_recv_time =0;
/* sleep in millisecond */
void isleep(unsigned long millisecond)
{
#ifdef __unix /* usleep( time * 1000 ); */
structtimespec ts;
    ts.tv_sec =(time_t)(millisecond /1000);
    ts.tv_nsec =(long)((millisecond %1000)*1000000);
/*nanosleep(&ts, NULL);*/
usleep((millisecond <<10)-(millisecond <<4)-(millisecond <<3));
#elif defined(_WIN32)
Sleep(millisecond);
#endif
}

int udp_output(const char *buf, int len, ikcpcb *kcp, void *user)
{

    kcpObj *send =(kcpObj *)user;

//發送信息
int n =sendto(send->sockfd, buf, len,0,(struct sockaddr *)&send->CientAddr,sizeof(struct sockaddr_in));
if(n >=0)
{
//會重複發送,因此犧牲帶寬
printf("send: %d bytes, t:%lld\n", n,iclock64()- first_recv_time);//24字節的KCP頭部
return n;
}
else
{
printf("error: %d bytes send, error\n", n);
return-1;
}
}

int init(kcpObj *send)
{
    send->sockfd =socket(AF_INET, SOCK_DGRAM,0);

if(send->sockfd <0)
{
perror("socket error!");
exit(1);
}

bzero(&send->addr,sizeof(send->addr));

    send->addr.sin_family = AF_INET;
    send->addr.sin_addr.s_addr =htonl(INADDR_ANY);//INADDR_ANY
    send->addr.sin_port =htons(send->port);

printf("服務器socket: %d  port:%d\n", send->sockfd, send->port);

if(send->sockfd <0)
{
perror("socket error!");
exit(1);
}

if(bind(send->sockfd,(struct sockaddr *)&(send->addr),sizeof(struct sockaddr_in))<0)
{
perror("bind");
exit(1);
}
}

void loop(kcpObj *send)
{
unsignedint len =sizeof(struct sockaddr_in);
int n, ret;
//接收到第一個包就開始循環處理
int recv_count =0;

isleep(1);
ikcp_update(send->pkcp,iclock());

char buf[RECV_BUF]={0};

while(1)
{
isleep(1);
ikcp_update(send->pkcp,iclock());
//處理收消息
        n =recvfrom(send->sockfd, buf, RECV_BUF, MSG_DONTWAIT,(struct sockaddr *)&send->CientAddr,&len);
if(n >0)
{
printf("UDP recv[%d]  size= %d   \n", recv_count++, n);
if(first_recv_time ==0)
{
                first_recv_time =iclock64();
}
//預接收數據:調用ikcp_input將裸數據交給KCP,這些數據有可能是KCP控制報文,並不是我們要的數據。
//kcp接收到下層協議UDP傳進來的數據底層數據buffer轉換成kcp的數據包格式
            ret =ikcp_input(send->pkcp, buf, n);
if(ret <0)
{
continue;
}
//kcp將接收到的kcp數據包還原成之前kcp發送的buffer數據
            ret =ikcp_recv(send->pkcp, buf, n);//從 buf中 提取真正數據,返回提取到的數據大小
if(ret <0)
{// 沒有檢測ikcp_recv提取到的數據
isleep(1);
continue;
}
int send_size = ret;
//ikcp_send只是把數據存入發送隊列,沒有對數據加封kcp頭部數據
//應該是在kcp_update裏面加封kcp頭部數據
//ikcp_send把要發送的buffer分片成KCP的數據包格式,插入待發送隊列中。
            ret =ikcp_send(send->pkcp, buf, send_size);
printf("Server reply ->  bytes[%d], ret = %d\n", send_size, ret);
ikcp_flush(send->pkcp);// 快速flush一次 以更快讓客戶端收到數據
            number++;
}
elseif(n ==0)
{
printf("finish loop\n");
break;
}
else
{
// printf("n:%d\n", n);
}
}
}

intmain(int argc,char*argv[])
{
printf("this is kcpServer\n");
if(argc <2)
{
printf("請輸入服務器端口號\n");
return-1;
}

    kcpObj send;
    send.port =atoi(argv[1]);
    send.pkcp =NULL;

bzero(send.buff,sizeof(send.buff));
charMsg[]="Server:Hello!";//與客戶機後續交互
memcpy(send.buff,Msg,sizeof(Msg));

    ikcpcb *kcp =ikcp_create(0x1,(void*)&send);//創建kcp對象把send傳給kcp的user變量
ikcp_setmtu(kcp,1400);
    kcp->output = udp_output;//設置kcp對象的回調函數
ikcp_nodelay(kcp,0,10,0,0);//1, 10, 2, 1
ikcp_wndsize(kcp,128,128);

    send.pkcp = kcp;

init(&send);//服務器初始化套接字
loop(&send);//循環處理

return0;
}

2.6、KCP 協議介紹

在這裏插入圖片描述

conv :連接號。UDP是⽆連接的,conv⽤於表示來⾃於哪個客戶端。對連接的⼀種替代,因爲有 conv ,所
以KCP也是⽀持多路復⽤的。
cmd :命令類型,只有四種
frg :分⽚,⽤戶數據可能會被分成多個KCP包,發送出去
在 xtaci/kcp-go 的實現中,這個字段始終爲0,以及沒有意義了,詳⻅issues/121
wnd :接收窗⼝⼤⼩,發送⽅的發送窗⼝不能超過接收⽅給出的數值,(其實是接收窗⼝的剩餘⼤⼩,這個
⼤⼩是動態變化的)
ts :時間序列
sn :序列號
una :下⼀個可接收的序列號。其實就是確認號,收到sn=10的包,una爲11
len :數據⻓度(DATA的⻓度)
data :⽤戶數據

2、CMD 四種類型

IKCP_CMD_PUSH 數據推送命令
IKCP_CMD_ACK 確認命令
IKCP_CMD_WASK 接收窗⼝⼤⼩詢問命令
IKCP_CMD_WINS 接收窗⼝⼤⼩告知命令

IKCP_CMD_PUSH 和 IKCP_CMD_ACK 關聯 IKCP_CMD_WASK 和 IKCP_CMD_WINS 關聯

2.7、使用流程

  1. 1. 創建 KCP 對象:ikcpcb *kcp = ikcp_create(conv, user);

  2. 2. 設置傳輸回調函數(如 UDP 的 send 函數):kcp->output = udp_output;

  3. 3. 真正發送數據需要調用 sendto

  4. 4. 循環調用 update:ikcp_update(kcp, millisec);

  5. 5. 輸入一個應用層數據包(如 UDP 收到的數據包): ikcp_input(kcp,received_udp_packet,received_udp_size);

  6. 6. 我們要使用 recvfrom 接收,然後扔到 kcp 裏面做解析

  7. 7. 發送數據:ikcp_send(kcp1, buffer, 8); 用戶層接口

  8. 8. 接收數據:hr = ikcp_recv(kcp2, buffer, 10);

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_ZpWTvT8MDUYsaon2S6oTg