comparison rtmpproto.c @ 5416:e01b6917b3cf libavformat

Implement RTMP output (publishing FLV stream to RTMP server). Patch by Sergiy (piratfm at `do-no-evil-mail`.com)
author kostya
date Fri, 04 Dec 2009 16:52:16 +0000
parents 0728f77cd599
children 9d7de5529047
comparison
equal deleted inserted replaced
5415:584c1ba93552 5416:e01b6917b3cf
45 45
46 /** RTMP protocol handler state */ 46 /** RTMP protocol handler state */
47 typedef enum { 47 typedef enum {
48 STATE_START, ///< client has not done anything yet 48 STATE_START, ///< client has not done anything yet
49 STATE_HANDSHAKED, ///< client has performed handshake 49 STATE_HANDSHAKED, ///< client has performed handshake
50 STATE_RELEASING, ///< client releasing stream before publish it (for output)
51 STATE_FCPUBLISH, ///< client FCPublishing stream (for output)
50 STATE_CONNECTING, ///< client connected to server successfully 52 STATE_CONNECTING, ///< client connected to server successfully
51 STATE_READY, ///< client has sent all needed commands and waits for server reply 53 STATE_READY, ///< client has sent all needed commands and waits for server reply
52 STATE_PLAYING, ///< client has started receiving multimedia data from server 54 STATE_PLAYING, ///< client has started receiving multimedia data from server
55 STATE_PUBLISHING, ///< client has started sending multimedia data to server (for output)
53 } ClientState; 56 } ClientState;
54 57
55 /** protocol handler context */ 58 /** protocol handler context */
56 typedef struct RTMPContext { 59 typedef struct RTMPContext {
57 URLContext* stream; ///< TCP stream used in interactions with RTMP server 60 URLContext* stream; ///< TCP stream used in interactions with RTMP server
63 ClientState state; ///< current state 66 ClientState state; ///< current state
64 int main_channel_id; ///< an additional channel ID which is used for some invocations 67 int main_channel_id; ///< an additional channel ID which is used for some invocations
65 uint8_t* flv_data; ///< buffer with data for demuxer 68 uint8_t* flv_data; ///< buffer with data for demuxer
66 int flv_size; ///< current buffer size 69 int flv_size; ///< current buffer size
67 int flv_off; ///< number of bytes read from current buffer 70 int flv_off; ///< number of bytes read from current buffer
71 RTMPPacket out_pkt; ///< rtmp packet, created from flv a/v or metadata (for output)
68 } RTMPContext; 72 } RTMPContext;
69 73
70 #define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing 74 #define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing
71 /** Client key used for digest signing */ 75 /** Client key used for digest signing */
72 static const uint8_t rtmp_player_key[] = { 76 static const uint8_t rtmp_player_key[] = {
95 */ 99 */
96 static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto, 100 static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto,
97 const char *host, int port) 101 const char *host, int port)
98 { 102 {
99 RTMPPacket pkt; 103 RTMPPacket pkt;
100 uint8_t ver[32], *p; 104 uint8_t ver[64], *p;
101 char tcurl[512]; 105 char tcurl[512];
102 106
103 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 4096); 107 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 4096);
104 p = pkt.data; 108 p = pkt.data;
105 109
108 ff_amf_write_number(&p, 1.0); 112 ff_amf_write_number(&p, 1.0);
109 ff_amf_write_object_start(&p); 113 ff_amf_write_object_start(&p);
110 ff_amf_write_field_name(&p, "app"); 114 ff_amf_write_field_name(&p, "app");
111 ff_amf_write_string(&p, rt->app); 115 ff_amf_write_string(&p, rt->app);
112 116
117 if (rt->is_input) {
113 snprintf(ver, sizeof(ver), "%s %d,%d,%d,%d", RTMP_CLIENT_PLATFORM, RTMP_CLIENT_VER1, 118 snprintf(ver, sizeof(ver), "%s %d,%d,%d,%d", RTMP_CLIENT_PLATFORM, RTMP_CLIENT_VER1,
114 RTMP_CLIENT_VER2, RTMP_CLIENT_VER3, RTMP_CLIENT_VER4); 119 RTMP_CLIENT_VER2, RTMP_CLIENT_VER3, RTMP_CLIENT_VER4);
120 } else {
121 snprintf(ver, sizeof(ver), "FMLE/3.0 (compatible; %s)", LIBAVFORMAT_IDENT);
122 ff_amf_write_field_name(&p, "type");
123 ff_amf_write_string(&p, "nonprivate");
124 }
115 ff_amf_write_field_name(&p, "flashVer"); 125 ff_amf_write_field_name(&p, "flashVer");
116 ff_amf_write_string(&p, ver); 126 ff_amf_write_string(&p, ver);
117 ff_amf_write_field_name(&p, "tcUrl"); 127 ff_amf_write_field_name(&p, "tcUrl");
118 ff_amf_write_string(&p, tcurl); 128 ff_amf_write_string(&p, tcurl);
129 if (rt->is_input) {
119 ff_amf_write_field_name(&p, "fpad"); 130 ff_amf_write_field_name(&p, "fpad");
120 ff_amf_write_bool(&p, 0); 131 ff_amf_write_bool(&p, 0);
121 ff_amf_write_field_name(&p, "capabilities"); 132 ff_amf_write_field_name(&p, "capabilities");
122 ff_amf_write_number(&p, 15.0); 133 ff_amf_write_number(&p, 15.0);
123 ff_amf_write_field_name(&p, "audioCodecs"); 134 ff_amf_write_field_name(&p, "audioCodecs");
124 ff_amf_write_number(&p, 1639.0); 135 ff_amf_write_number(&p, 1639.0);
125 ff_amf_write_field_name(&p, "videoCodecs"); 136 ff_amf_write_field_name(&p, "videoCodecs");
126 ff_amf_write_number(&p, 252.0); 137 ff_amf_write_number(&p, 252.0);
127 ff_amf_write_field_name(&p, "videoFunction"); 138 ff_amf_write_field_name(&p, "videoFunction");
128 ff_amf_write_number(&p, 1.0); 139 ff_amf_write_number(&p, 1.0);
140 }
129 ff_amf_write_object_end(&p); 141 ff_amf_write_object_end(&p);
130 142
131 pkt.data_size = p - pkt.data; 143 pkt.data_size = p - pkt.data;
132 144
133 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); 145 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
146 }
147
148 /**
149 * Generates 'releaseStream' call and sends it to the server. It should make
150 * the server release some channel for media streams.
151 */
152 static void gen_release_stream(URLContext *s, RTMPContext *rt)
153 {
154 RTMPPacket pkt;
155 uint8_t *p;
156
157 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
158 29 + strlen(rt->playpath));
159
160 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Releasing stream...\n");
161 p = pkt.data;
162 ff_amf_write_string(&p, "releaseStream");
163 ff_amf_write_number(&p, 2.0);
164 ff_amf_write_null(&p);
165 ff_amf_write_string(&p, rt->playpath);
166
167 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
168 ff_rtmp_packet_destroy(&pkt);
169 }
170
171 /**
172 * Generates 'FCPublish' call and sends it to the server. It should make
173 * the server preapare for receiving media streams.
174 */
175 static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
176 {
177 RTMPPacket pkt;
178 uint8_t *p;
179
180 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
181 25 + strlen(rt->playpath));
182
183 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "FCPublish stream...\n");
184 p = pkt.data;
185 ff_amf_write_string(&p, "FCPublish");
186 ff_amf_write_number(&p, 3.0);
187 ff_amf_write_null(&p);
188 ff_amf_write_string(&p, rt->playpath);
189
190 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
191 ff_rtmp_packet_destroy(&pkt);
192 }
193
194 /**
195 * Generates 'FCUnpublish' call and sends it to the server. It should make
196 * the server destroy stream.
197 */
198 static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
199 {
200 RTMPPacket pkt;
201 uint8_t *p;
202
203 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0,
204 27 + strlen(rt->playpath));
205
206 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "UnPublishing stream...\n");
207 p = pkt.data;
208 ff_amf_write_string(&p, "FCUnpublish");
209 ff_amf_write_number(&p, 5.0);
210 ff_amf_write_null(&p);
211 ff_amf_write_string(&p, rt->playpath);
212
213 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
214 ff_rtmp_packet_destroy(&pkt);
134 } 215 }
135 216
136 /** 217 /**
137 * Generates 'createStream' call and sends it to the server. It should make 218 * Generates 'createStream' call and sends it to the server. It should make
138 * the server allocate some channel for media streams. 219 * the server allocate some channel for media streams.
145 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Creating stream...\n"); 226 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Creating stream...\n");
146 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 25); 227 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 25);
147 228
148 p = pkt.data; 229 p = pkt.data;
149 ff_amf_write_string(&p, "createStream"); 230 ff_amf_write_string(&p, "createStream");
150 ff_amf_write_number(&p, 3.0); 231 ff_amf_write_number(&p, rt->is_input ? 3.0 : 4.0);
151 ff_amf_write_null(&p); 232 ff_amf_write_null(&p);
233
234 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
235 ff_rtmp_packet_destroy(&pkt);
236 }
237
238
239 /**
240 * Generates 'deleteStream' call and sends it to the server. It should make
241 * the server remove some channel for media streams.
242 */
243 static void gen_delete_stream(URLContext *s, RTMPContext *rt)
244 {
245 RTMPPacket pkt;
246 uint8_t *p;
247
248 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Deleting stream...\n");
249 ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 34);
250
251 p = pkt.data;
252 ff_amf_write_string(&p, "deleteStream");
253 ff_amf_write_number(&p, 0.0);
254 ff_amf_write_null(&p);
255 ff_amf_write_number(&p, rt->main_channel_id);
152 256
153 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); 257 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
154 ff_rtmp_packet_destroy(&pkt); 258 ff_rtmp_packet_destroy(&pkt);
155 } 259 }
156 260
182 286
183 p = pkt.data; 287 p = pkt.data;
184 bytestream_put_be16(&p, 3); 288 bytestream_put_be16(&p, 3);
185 bytestream_put_be32(&p, 1); 289 bytestream_put_be32(&p, 1);
186 bytestream_put_be32(&p, 256); //TODO: what is a good value here? 290 bytestream_put_be32(&p, 256); //TODO: what is a good value here?
291
292 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
293 ff_rtmp_packet_destroy(&pkt);
294 }
295
296 /**
297 * Generates 'publish' call and sends it to the server.
298 */
299 static void gen_publish(URLContext *s, RTMPContext *rt)
300 {
301 RTMPPacket pkt;
302 uint8_t *p;
303
304 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Sending publish command for '%s'\n", rt->playpath);
305 ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE, 0,
306 30 + strlen(rt->playpath));
307 pkt.extra = rt->main_channel_id;
308
309 p = pkt.data;
310 ff_amf_write_string(&p, "publish");
311 ff_amf_write_number(&p, 0.0);
312 ff_amf_write_null(&p);
313 ff_amf_write_string(&p, rt->playpath);
314 ff_amf_write_string(&p, "live");
187 315
188 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); 316 ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
189 ff_rtmp_packet_destroy(&pkt); 317 ff_rtmp_packet_destroy(&pkt);
190 } 318 }
191 319
347 } 475 }
348 476
349 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n", 477 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n",
350 serverdata[5], serverdata[6], serverdata[7], serverdata[8]); 478 serverdata[5], serverdata[6], serverdata[7], serverdata[8]);
351 479
480 if (rt->is_input) {
352 server_pos = rtmp_validate_digest(serverdata + 1, 772); 481 server_pos = rtmp_validate_digest(serverdata + 1, 772);
353 if (!server_pos) { 482 if (!server_pos) {
354 server_pos = rtmp_validate_digest(serverdata + 1, 8); 483 server_pos = rtmp_validate_digest(serverdata + 1, 8);
355 if (!server_pos) { 484 if (!server_pos) {
356 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Server response validating failed\n"); 485 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Server response validating failed\n");
378 digest, 32, 507 digest, 32,
379 tosend + RTMP_HANDSHAKE_PACKET_SIZE - 32); 508 tosend + RTMP_HANDSHAKE_PACKET_SIZE - 32);
380 509
381 // write reply back to the server 510 // write reply back to the server
382 url_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE); 511 url_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE);
512 } else {
513 url_write(rt->stream, serverdata+1, RTMP_HANDSHAKE_PACKET_SIZE);
514 }
515
383 return 0; 516 return 0;
384 } 517 }
385 518
386 /** 519 /**
387 * Parses received packet and may perform some action depending on 520 * Parses received packet and may perform some action depending on
399 if (pkt->data_size != 4) { 532 if (pkt->data_size != 4) {
400 av_log(LOG_CONTEXT, AV_LOG_ERROR, 533 av_log(LOG_CONTEXT, AV_LOG_ERROR,
401 "Chunk size change packet is not 4 bytes long (%d)\n", pkt->data_size); 534 "Chunk size change packet is not 4 bytes long (%d)\n", pkt->data_size);
402 return -1; 535 return -1;
403 } 536 }
537 if (!rt->is_input)
538 ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size, rt->prev_pkt[1]);
404 rt->chunk_size = AV_RB32(pkt->data); 539 rt->chunk_size = AV_RB32(pkt->data);
405 if (rt->chunk_size <= 0) { 540 if (rt->chunk_size <= 0) {
406 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Incorrect chunk size %d\n", rt->chunk_size); 541 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Incorrect chunk size %d\n", rt->chunk_size);
407 return -1; 542 return -1;
408 } 543 }
423 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Server error: %s\n",tmpstr); 558 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
424 return -1; 559 return -1;
425 } else if (!memcmp(pkt->data, "\002\000\007_result", 10)) { 560 } else if (!memcmp(pkt->data, "\002\000\007_result", 10)) {
426 switch (rt->state) { 561 switch (rt->state) {
427 case STATE_HANDSHAKED: 562 case STATE_HANDSHAKED:
563 if (!rt->is_input) {
564 gen_release_stream(s, rt);
565 gen_fcpublish_stream(s, rt);
566 rt->state = STATE_RELEASING;
567 } else {
568 rt->state = STATE_CONNECTING;
569 }
428 gen_create_stream(s, rt); 570 gen_create_stream(s, rt);
571 break;
572 case STATE_FCPUBLISH:
429 rt->state = STATE_CONNECTING; 573 rt->state = STATE_CONNECTING;
430 break; 574 break;
575 case STATE_RELEASING:
576 rt->state = STATE_FCPUBLISH;
577 /* hack for Wowza Media Server, it does not send result for
578 * releaseStream and FCPublish calls */
579 if (!pkt->data[10]) {
580 int pkt_id = (int) av_int2dbl(AV_RB64(pkt->data + 11));
581 if (pkt_id == 4)
582 rt->state = STATE_CONNECTING;
583 }
584 if(rt->state != STATE_CONNECTING)
585 break;
431 case STATE_CONNECTING: 586 case STATE_CONNECTING:
432 //extract a number from the result 587 //extract a number from the result
433 if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) { 588 if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
434 av_log(LOG_CONTEXT, AV_LOG_WARNING, "Unexpected reply on connect()\n"); 589 av_log(LOG_CONTEXT, AV_LOG_WARNING, "Unexpected reply on connect()\n");
435 } else { 590 } else {
436 rt->main_channel_id = (int) av_int2dbl(AV_RB64(pkt->data + 21)); 591 rt->main_channel_id = (int) av_int2dbl(AV_RB64(pkt->data + 21));
437 } 592 }
593 if (rt->is_input) {
438 gen_play(s, rt); 594 gen_play(s, rt);
595 } else {
596 gen_publish(s, rt);
597 }
439 rt->state = STATE_READY; 598 rt->state = STATE_READY;
440 break; 599 break;
441 } 600 }
442 } else if (!memcmp(pkt->data, "\002\000\010onStatus", 11)) { 601 } else if (!memcmp(pkt->data, "\002\000\010onStatus", 11)) {
443 const uint8_t* ptr = pkt->data + 11; 602 const uint8_t* ptr = pkt->data + 11;
457 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Server error: %s\n",tmpstr); 616 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
458 return -1; 617 return -1;
459 } 618 }
460 t = ff_amf_get_field_value(ptr, data_end, 619 t = ff_amf_get_field_value(ptr, data_end,
461 "code", tmpstr, sizeof(tmpstr)); 620 "code", tmpstr, sizeof(tmpstr));
462 if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) { 621 if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) rt->state = STATE_PLAYING;
463 rt->state = STATE_PLAYING; 622 if (!t && !strcmp(tmpstr, "NetStream.Publish.Start")) rt->state = STATE_PUBLISHING;
464 return 0;
465 }
466 } 623 }
467 break; 624 break;
468 } 625 }
469 return 0; 626 return 0;
470 } 627 }
499 ret = rtmp_parse_result(s, rt, &rpkt); 656 ret = rtmp_parse_result(s, rt, &rpkt);
500 if (ret < 0) {//serious error in current packet 657 if (ret < 0) {//serious error in current packet
501 ff_rtmp_packet_destroy(&rpkt); 658 ff_rtmp_packet_destroy(&rpkt);
502 return -1; 659 return -1;
503 } 660 }
504 if (for_header && rt->state == STATE_PLAYING) { 661 if (for_header && (rt->state == STATE_PLAYING || rt->state == STATE_PUBLISHING)) {
505 ff_rtmp_packet_destroy(&rpkt); 662 ff_rtmp_packet_destroy(&rpkt);
506 return 0; 663 return 0;
507 } 664 }
508 if (!rpkt.data_size) { 665 if (!rpkt.data_size || !rt->is_input) {
509 ff_rtmp_packet_destroy(&rpkt); 666 ff_rtmp_packet_destroy(&rpkt);
510 continue; 667 continue;
511 } 668 }
512 if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO || 669 if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO ||
513 (rpkt.type == RTMP_PT_NOTIFY && !memcmp("\002\000\012onMetaData", rpkt.data, 13))) { 670 (rpkt.type == RTMP_PT_NOTIFY && !memcmp("\002\000\012onMetaData", rpkt.data, 13))) {
543 700
544 static int rtmp_close(URLContext *h) 701 static int rtmp_close(URLContext *h)
545 { 702 {
546 RTMPContext *rt = h->priv_data; 703 RTMPContext *rt = h->priv_data;
547 704
705 if(!rt->is_input) {
706 rt->flv_data = NULL;
707 if (rt->out_pkt.data_size)
708 ff_rtmp_packet_destroy(&rt->out_pkt);
709 gen_fcunpublish_stream(h, rt);
710 }
711 gen_delete_stream(h, rt);
712
548 av_freep(&rt->flv_data); 713 av_freep(&rt->flv_data);
549 url_close(rt->stream); 714 url_close(rt->stream);
550 av_free(rt); 715 av_free(rt);
551 return 0; 716 return 0;
552 } 717 }
584 if (url_open(&rt->stream, buf, URL_RDWR) < 0) { 749 if (url_open(&rt->stream, buf, URL_RDWR) < 0) {
585 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Cannot open connection %s\n", buf); 750 av_log(LOG_CONTEXT, AV_LOG_ERROR, "Cannot open connection %s\n", buf);
586 goto fail; 751 goto fail;
587 } 752 }
588 753
589 if (!rt->is_input) {
590 av_log(LOG_CONTEXT, AV_LOG_ERROR, "RTMP output is not supported yet.\n");
591 goto fail;
592 } else {
593 rt->state = STATE_START; 754 rt->state = STATE_START;
594 if (rtmp_handshake(s, rt)) 755 if (rtmp_handshake(s, rt))
595 return -1; 756 return -1;
596 757
597 rt->chunk_size = 128; 758 rt->chunk_size = 128;
633 do { 794 do {
634 ret = get_packet(s, 1); 795 ret = get_packet(s, 1);
635 } while (ret == EAGAIN); 796 } while (ret == EAGAIN);
636 if (ret < 0) 797 if (ret < 0)
637 goto fail; 798 goto fail;
799
800 if (rt->is_input) {
638 // generate FLV header for demuxer 801 // generate FLV header for demuxer
639 rt->flv_size = 13; 802 rt->flv_size = 13;
640 rt->flv_data = av_realloc(rt->flv_data, rt->flv_size); 803 rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
641 rt->flv_off = 0; 804 rt->flv_off = 0;
642 memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size); 805 memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size);
806 } else {
807 rt->flv_size = 0;
808 rt->flv_data = NULL;
809 rt->flv_off = 0;
643 } 810 }
644 811
645 s->max_packet_size = url_get_max_packet_size(rt->stream); 812 s->max_packet_size = url_get_max_packet_size(rt->stream);
646 s->is_streamed = 1; 813 s->is_streamed = 1;
647 return 0; 814 return 0;
677 return orig_size; 844 return orig_size;
678 } 845 }
679 846
680 static int rtmp_write(URLContext *h, uint8_t *buf, int size) 847 static int rtmp_write(URLContext *h, uint8_t *buf, int size)
681 { 848 {
682 return 0; 849 RTMPContext *rt = h->priv_data;
850 int size_temp = size;
851 int pktsize, pkttype;
852 uint32_t ts;
853 const uint8_t *buf_temp = buf;
854
855 if (size < 11) {
856 av_log(LOG_CONTEXT, AV_LOG_DEBUG, "FLV packet too small %d\n", size);
857 return 0;
858 }
859
860 do {
861 if (!rt->flv_off) {
862 //skip flv header
863 if (buf_temp[0] == 'F' && buf_temp[1] == 'L' && buf_temp[2] == 'V') {
864 buf_temp += 9 + 4;
865 size_temp -= 9 + 4;
866 }
867
868 pkttype = bytestream_get_byte(&buf_temp);
869 pktsize = bytestream_get_be24(&buf_temp);
870 ts = bytestream_get_be24(&buf_temp);
871 ts |= bytestream_get_byte(&buf_temp) << 24;
872 bytestream_get_be24(&buf_temp);
873 size_temp -= 11;
874 rt->flv_size = pktsize;
875
876 //force 12bytes header
877 if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) ||
878 pkttype == RTMP_PT_NOTIFY) {
879 if (pkttype == RTMP_PT_NOTIFY)
880 pktsize += 16;
881 rt->prev_pkt[1][RTMP_SOURCE_CHANNEL].channel_id = 0;
882 }
883
884 //this can be a big packet, it's better to send it right here
885 ff_rtmp_packet_create(&rt->out_pkt, RTMP_SOURCE_CHANNEL, pkttype, ts, pktsize);
886 rt->out_pkt.extra = rt->main_channel_id;
887 rt->flv_data = rt->out_pkt.data;
888
889 if (pkttype == RTMP_PT_NOTIFY)
890 ff_amf_write_string(&rt->flv_data, "@setDataFrame");
891 }
892
893 if (rt->flv_size - rt->flv_off > size_temp) {
894 bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, size_temp);
895 rt->flv_off += size_temp;
896 } else {
897 bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, rt->flv_size - rt->flv_off);
898 rt->flv_off += rt->flv_size - rt->flv_off;
899 }
900
901 if (rt->flv_off == rt->flv_size) {
902 bytestream_get_be32(&buf_temp);
903
904 ff_rtmp_packet_write(rt->stream, &rt->out_pkt, rt->chunk_size, rt->prev_pkt[1]);
905 ff_rtmp_packet_destroy(&rt->out_pkt);
906 rt->flv_size = 0;
907 rt->flv_off = 0;
908 }
909 } while (buf_temp - buf < size_temp);
910 return size;
683 } 911 }
684 912
685 URLProtocol rtmp_protocol = { 913 URLProtocol rtmp_protocol = {
686 "rtmp", 914 "rtmp",
687 rtmp_open, 915 rtmp_open,