SRS流媒体服务器——RTMP推流、拉流创建连接
目录
- 识别客户端,然后获取或创建它SrsLiveSource
- 启动推流
- 启动拉流
请按照此顺序阅读
- SRS流媒体服务器——RTMP端?监听逻辑分析
- SRS流媒体服务器——RTMP推流、拉流创建连接
- SRS流媒体服务器-服务器读取RTMP推流数据
- SRS流媒体服务器-服务器RTMP将数据转发到拉流端
1. RTMP推流、拉流创建连接
- RTMP推流和拉流创建了连接对象的公式SrsRtmpConn,见上面SrsServer::fd_to_resource函数。
- 每个SrsRtmpConn都绑定?个SrsCoroutine,具体业务处理在SrsCoroutine对于RTMP??最终循环是SrsRtmpConn::cycle函数。
- 而SrsRtmpConn::cycle函数最终会调用SrsRtmpConn::do_cycle函数。
srs_error_t SrsRtmpConn::do_cycle() {
srs_error_t err = srs_success; srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd)); rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); if ((err = rtmp->handshake()) != srs_success) {
//RTMP握手逻辑 return srs_error_wrap(err, "rtmp handshake"); } uint32_t rip = rtmp->proxy_real_ip(); if (rip > 0) {
srs_trace("RTMP proxy real client ip=%d.%d.%d.%d", uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
}
SrsRequest* req = info->req;
if ((err = rtmp->connect_app(req)) != srs_success) {
//接收connect请求
return srs_error_wrap(err, "rtmp connect tcUrl");
}
// set client ip to request.
req->ip = ip;
srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",
req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
req->schema.c_str(), req->vhost.c_str(), req->port,
req->app.c_str(), (req->args? "(obj)":"null"));
...
if ((err = service_cycle()) != srs_success) {
err = srs_error_wrap(err, "service cycle");
}
srs_error_t r0 = srs_success;
if ((r0 = on_disconnect()) != srs_success) {
err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
srs_freep(r0);
}
// If client is redirect to other servers, we already logged the event.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
srs_error_reset(err);
}
return err;
}
- SrsRtmpConn::cycle函数主要是进行RTMP握手,接收connect请求,判断为有效连接后调用SrsRtmpConn::service_cycle函数。
- todo:后续会写一篇具体RTMP握手过程,命令控制消息和协议控制消息交互逻辑文章。
srs_error_t SrsRtmpConn::service_cycle()
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
int out_ack_size = _srs_config->get_out_ack_size(req->vhost);
if (out_ack_size && (err = rtmp->set_window_ack_size(out_ack_size)) != srs_success) {
return srs_error_wrap(err, "rtmp: set out window ack size");
}
int in_ack_size = _srs_config->get_in_ack_size(req->vhost);
if (in_ack_size && (err = rtmp->set_in_window_ack_size(in_ack_size)) != srs_success) {
return srs_error_wrap(err, "rtmp: set in window ack size");
}
if ((err = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != srs_success) {
return srs_error_wrap(err, "rtmp: set peer bandwidth");
}
// get the ip which client connected.
std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd));
// do bandwidth test if connect to the vhost which is for bandwidth check.
if (_srs_config->get_bw_check_enabled(req->vhost)) {
if ((err = bandwidth->bandwidth_check(rtmp, skt, req, local_ip)) != srs_success) {
return srs_error_wrap(err, "rtmp: bandwidth check");
}
return err;
}
// set chunk size to larger.
// set the chunk size before any larger response greater than 128,
// to make OBS happy, @see https://github.com/ossrs/srs/issues/454
int chunk_size = _srs_config->get_chunk_size(req->vhost);
if ((err = rtmp->set_chunk_size(chunk_size)) != srs_success) {
return srs_error_wrap(err, "rtmp: set chunk size %d", chunk_size);
}
// response the client connect ok. 响应客户端connect请求
if ((err = rtmp->response_connect_app(req, local_ip.c_str())) != srs_success) {
return srs_error_wrap(err, "rtmp: response connect app");
}
if ((err = rtmp->on_bw_done()) != srs_success) {
return srs_error_wrap(err, "rtmp: on bw down");
}
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "rtmp: thread quit");
}
err = stream_service_cycle();
// stream service must terminated with error, never success.
// when terminated with success, it's user required to stop.
// TODO: FIXME: Support RTMP client timeout, https://github.com/ossrs/srs/issues/1134
if (err == srs_success) {
continue;
}
...
// for other system control message, fatal error.
return srs_error_wrap(err, "rtmp: reject");
}
return err;
}
- 设置协议控制消息后进入SrsRtmpConn::stream_service_cycle,stream_service_cycle负责流服务循环
- 首先需要识别客户端,然后获取或者创建SrsLiveSource。
- 然后根据SrsRtmpConnType判断推流还是拉流,执行推流逻辑(SrsRtmpConn::playing)或者拉流逻辑(SrsRtmpConn::publishing)
srs_error_t SrsRtmpConn::stream_service_cycle()
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
//识别客户端
if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) {
return srs_error_wrap(err, "rtmp: identify client");
}
srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
req->strip();
srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%dms",
srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), srsu2msi(req->duration));
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
if (parsed_vhost) {
req->vhost = parsed_vhost->arg0();
}
if (req->schema.empty() || req->vhost.empty() || req->port == 0 || req->app.empty()) {
return srs_error_new(ERROR_RTMP_REQ_TCURL, "discovery tcUrl failed, tcUrl=%s, schema=%s, vhost=%s, port=%d, app=%s",
req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str());
}
// check vhost, allow default vhost.
if ((err = check_vhost(true)) != srs_success) {
return srs_error_wrap(err, "check vhost");
}
srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, stream=%s, param=%s, args=%s",
req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port,
req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null"));
// do token traverse before serve it.
// @see https://github.com/ossrs/srs/pull/239
if (true) {
info->edge = _srs_config->get_vhost_is_edge(req->vhost);
bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
if (info->edge && edge_traverse) {
if ((err = check_edge_token_traverse_auth()) != srs_success) {
return srs_error_wrap(err, "rtmp: check token traverse");
}
}
}
// security check
if ((err = security->check(info->type, ip, req)) != srs_success) {
return srs_error_wrap(err, "rtmp: security check");
}
// Never allow the empty stream name, for HLS may write to a file with empty name.
// @see https://github.com/ossrs/srs/issues/834
if (req->stream.empty()) {
return srs_error_new(ERROR_RTMP_STREAM_NAME_EMPTY, "rtmp: empty stream");
}
// client is identified, set the timeout to service timeout.
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
// find a source to serve.
SrsLiveSource* source = NULL;
if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
//获取或者创建SrsLiveSource
return srs_error_wrap(err, "rtmp: fetch source");
}
srs_assert(source != NULL);
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str());
source->set_cache(enabled_cache);
switch (info->type) {
case SrsRtmpConnPlay: {
// response connection start play
if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start play");
}
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on play");
}
err = playing(source);
http_hooks_on_stop();
return err;
}
case SrsRtmpConnFMLEPublish: {
if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start FMLE publish");
}
return publishing(source);
}
case SrsRtmpConnHaivisionPublish: {
if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start HAIVISION publish");
}
return publishing(source);
}
case SrsRtmpConnFlashPublish: {
if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start FLASH publish");
}
return publishing(source);
}
default: {
return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
}
}
return err;
}
1. 识别客户端,然后获取或者创建SrsLiveSource
- SrsRtmpServer::identify_client负责识别客户端。
- 当协议控制消息是createStream时,返回SrsFMLEStartPacket对象,进行推流。
- 当协议控制消息是play时,返回SrsPlayPacket对象,进行拉流。
srs_error_t SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& stream_name, srs_utime_t& duration)
{
type = SrsRtmpConnUnknown;
srs_error_t err = srs_success;
while (true) {
SrsCommonMessage* msg = NULL;
if ((err = protocol->recv_message(&msg)) != srs_success) {
return srs_error_wrap(err, "recv identify message");
}
SrsAutoFree(SrsCommonMessage, msg);
SrsMessageHeader& h = msg->header;
if (h.is_ackledgement() || h.is_set_chunk_size() || h.is_window_ackledgement_size() || h.is_user_control_message()) {
continue;
}
if (!h.is_amf0_command() && !h.is_amf3_command()) {
srs_trace("ignore message type=%#x", h.message_type);
continue;
}
SrsPacket* pkt = NULL;
if ((err = protocol->decode_message(msg, &pkt)) != srs_success) {
return srs_error_wrap(err, "decode identify");
}
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, 3, type, stream_name, duration);
}
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
//RTMP推流会先发送releaseStream协议控制消息,解析后返回SrsFMLEStartPacket对象
return identify_fmle_publish_client(dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name);
}
if (dynamic_cast<SrsPlayPacket*>(pkt)) {
//RTMP拉流收到play协议控制消息,解析后返回SrsPlayPacket对象
return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt), type, stream_name, duration);
}
// call msg,
// support response null first,
// @see https://github.com/ossrs/srs/issues/106
// TODO: FIXME: response in right way, or forward in edge mode.
SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
if (call) {
SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);
res->command_object = SrsAmf0Any::null();
res->response = SrsAmf0Any::null();
if ((err = protocol->send_and_free_packet(res, 0)) != srs_success) {
return srs_error_wrap(err, "response call");
}
// For encoder of Haivision, it always send a _checkbw call message.
// @remark the next message is createStream, so we continue to identify it.
// @see https://github.com/ossrs/srs/issues/844
if (call->command_name == "_checkbw") {
continue;
}
continue;
}
srs_trace("ignore AMF0/AMF3 command message.");
}
return err;
}
- SrsProtocol::do_decode_message负责解析各种消息格式,例如上述的SrsFMLEStartPacket对象,解析代码是:
srs_error_t SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket)
{
srs_error_t err = srs_success;
SrsPacket* packet = NULL;
// decode specified packet type
if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) {
// skip 1bytes to decode the amf3 command.
if (header.is_amf3_command() && stream->require(1)) {
stream->skip(1);
}
...
// reset to zero(amf3 to 1) to restart decode.
stream->skip(-1 * stream->pos());
if (header.is_amf3_command()) {
stream->skip(1);
}
// decode command object.
if (command == RTMP_AMF0_COMMAND_CONNECT) {
*ppacket = packet = new SrsConnectAppPacket();
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
*ppacket = packet = new SrsCreateStreamPacket();
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_PLAY) {
*ppacket = packet = new SrsPlayPacket(); //RTMP拉流
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_PAUSE) {
*ppacket = packet = new SrsPausePacket();
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
*ppacket = packet = new SrsFMLEStartPacket(); //RTMP推流
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
....
return err;
}
- 推流时解析协议控制消息调用栈如下,拉流调用栈也是一样。
(gdb) bt
#0 SrsFMLEStartPacket::SrsFMLEStartPacket (this=0x105beb0) at src/protocol/srs_rtmp_stack.cpp:3556
#1 0x0000000000483841 in SrsProtocol::do_decode_message (this=0x1040110, header=..., stream=0x105a250, ppacket=0x105a240) at src/protocol/srs_rtmp_stack.cpp:700
#2 0x00000000004826ed in SrsProtocol::decode_message (this=0x1040110, msg=0x105bc40, ppacket=0x105a2c8) at src/protocol/srs_rtmp_stack.cpp:413
#3 0x000000000048d851 in SrsRtmpServer::identify_client (this=0x10400e0, stream_id=1, type=@0x1049998: SrsRtmpConnUnknown, stream_name="", duration=@0x1049b18: -1)
at src/protocol/srs_rtmp_stack.cpp:2510
#4 0x000000000050090c in SrsRtmpConn::stream_service_cycle (this=0x103fcc0) at src/app/srs_app_rtmp_conn.cpp:454
#5 0x0000000000500512 in SrsRtmpConn::service_cycle (this=0x103fcc0) at src/app/srs_app_rtmp_conn.cpp:403
#6 0x00000000004fefd1 in SrsRtmpConn::do_cycle (this=0x103fcc0) at src/app/srs_app_rtmp_conn.cpp:216
#7 0x0000000000507f59 in SrsRtmpConn::cycle (this=0x103fcc0) at src/app/srs_app_rtmp_conn.cpp:1457
#8 0x00000000005379d8 in SrsFastCoroutine::cycle (this=0x1040040) at src/app/srs_app_st.cpp:272
#9 0x0000000000537a6e in SrsFastCoroutine::pfn (arg=0x1040040) at src/app/srs_app_st.cpp:287
#10 0x000000000065010b in _st_thread_main () at sched.c:363
#11 0x0000000000650988 in st_thread_create (start=0xf78f00, arg=0x101040040, joinable=0, stk_size=6640311) at sched.c:694
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
- 当SrsProtocol::decode_message函数中返回的对象是SrsFMLEStartPacket,会调用SrsRtmpServer::identify_fmle_publish_client,type赋值为SrsRtmpConnFMLEPublish,表示推流。
srs_error_t SrsRtmpServer::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, string& stream_name)
{
srs_error_t err = srs_success;
type = SrsRtmpConnFMLEPublish;
stream_name = req->stream_name;
// releaseStream response
if (true) {
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
return srs_error_wrap(err, "send releaseStream response");
}
}
return err;
}
- 当SrsProtocol::decode_message函数中返回的对象是SrsPlayPacket,会调用SrsRtmpServer::identify_play_client,type赋值为SrsRtmpConnPlay,表示拉流。
srs_error_t SrsRtmpServer::identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type, string& stream_name, srs_utime_t& duration)
{
type = SrsRtmpConnPlay;
stream_name = req->stream_name;
duration = srs_utime_t(req->duration) * SRS_UTIME_MILLISECONDS;
return srs_success;
}
- 获取SrsLiveSource,如果没有则创建。推流时创建,一个推流对应一个source,同一个source对应多个拉流。
srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps)
{
srs_error_t err = srs_success;
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller lock.
SrsLocker(lock);
SrsLiveSource* source = NULL;
if ((source = fetch(r)) != NULL) {
//通过stream_url在SrsLiveSourceManager的pool(map)中查询对应的SrsLiveSource
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->update_auth(r);
*pps = source;
return err;
}
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
srs_trace("new source, stream_url=%s", stream_url.c_str());
source = new SrsLiveSource(); //没找到则创建并进行初始化
if ((err = source->initialize(r, h)) != srs_success) {
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
goto failed;
}
pool[stream_url] = source;
*pps = source;
return err;
failed:
srs_freep(source);
return err;
}
2. 启动推流
- 推流最终会通过SrsRtmpConn::publishing函数调用SrsRtmpConn::do_publishing中,调用栈为:
(gdb) bt
#0 SrsRtmpConn::do_publishing (this=0x103fcc0, source=0x105c250, rtrd=0x105a280) at src/app/srs_app_rtmp_conn.cpp:857
#1 0x000000000050432a in SrsRtmpConn::publishing (this=0x103fcc0, source=0x105c250) at src/app/srs_app_rtmp_conn.cpp:838
#2 0x00000000005016dd in SrsRtmpConn::stream_service_cycle (this=0x103fcc0) at src/app/srs_app_rtmp_conn.cpp:542
#3 0x0000000000500512 in SrsRtmpConn::service_cycle (this=0x103fcc0) at src/app/srs_app_rtmp_conn.cpp:403
#4 0x00000000004fefd1 in SrsRtmpConn::do_cycle (this=0x103fcc0) at src/app/srs_app_rtmp_conn.cpp:216
#5 0x0000000000507f59 in SrsRtmpConn::cycle (this=0x103fcc0) at src/app/srs_app_rtmp_conn.cpp:1457
#6 0x00000000005379d8 in SrsFastCoroutine::cycle (this=0x1040040) at src/app/srs_app_st.cpp:272
#7 0x0000000000537a6e in SrsFastCoroutine::pfn (arg=0x1040040) at src/app/srs_app_st.cpp:287
#8 0x000000000065010b in _st_thread_main () at sched.c:363
#9 0x0000000000650988 in st_thread_create (start=0xf78f00, arg=0x101040040, joinable=0, stk_size=6640311) at sched.c:694
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
- SrsRtmpConn::publishing会创建SrsPublishRecvThread协程来接收数据,然后在SrsRtmpConn::do_publishing开启接收协程。
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) {
if ((err = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
if ((err = http_hooks_on_publish()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on publish");
}
// TODO: FIXME: Should refine the state of publishing.
if ((err = acquire_publish(source)) == srs_success) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
err = do_publishing(source, &rtrd);
rtrd.stop();
}
// whatever the acquire publish, always release publish.
// when the acquire error in the midlle-way, the publish state changed,
// but failed, so we must cleanup it.
// @see https://github.com/ossrs/srs/issues/474
// @remark when stream is busy, should never release it.
if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) {
release_publish(source);
}
http_hooks_on_unpublish();
return err;
}
srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
SrsAutoFree(SrsPithyPrint, pprint);
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
// start isolate recv thread.
// TODO: FIXME: Pass the callback here.
if ((err = rtrd->start()) != srs_success) {
return srs_error_wrap(err, "rtmp: receive thread");
}
...
return err;
}
- 最终会执行到SrsRecvThread::do_cycle,具体逻辑见第三部分:服务器读取RTMP推流数据
3. 启动拉流
- 拉流最终会通过SrsRtmpConn::playing函数调用SrsRtmpConn::do_playing中,调用栈为:
(gdb) bt
#0 SrsRtmpConn::do_playing (this=0x10acda0, source=0x105c280, consumer=0x10c8b60, rtrd=0x10c7230) at src/app/srs_app_rtmp_conn.cpp:688
#1 0x0000000000502ccb in SrsRtmpConn::playing (this=0x10acda0, source=0x105c280) at src/app/srs_app_rtmp_conn.cpp:674
#2 0x0000000000501600 in SrsRtmpConn::stream_service_cycle (this=0x10acda0) at src/app/srs_app_rtmp_conn.cpp:532
#3 0x0000000000500512 in SrsRtmpConn::service_cycle (this=0x10acda0) at src/app/srs_app_rtmp_conn.cpp:403
#4 0x00000000004fefd1 in SrsRtmpConn::do_cycle (this=0x10acda0) at src/app/srs_app_rtmp_conn.cpp:216
#5 0x0000000000507f59 in SrsRtmpConn::cycle (this=0x10acda0) at src/app/srs_app_rtmp_conn.cpp:1457
#6 0x00000000005379d8 in SrsFastCoroutine::cycle (this=0x10ad050) at src/app/srs_app_st.cpp:272
#7 0x0000000000537a6e in SrsFastCoroutine::pfn (arg=0x10ad050) at src/app/srs_app_st.cpp:287
#8 0x000000000065010b in _st_thread_main () at sched.c:363
#9 0x0000000000650988 in st_thread_create (start=0xf78f00, arg=0x1010ad050, joinable=0, stk_size=6640311) at sched.c:694
Backtrace stopped: previous frame inner to this frame (corrupt stack?)