Skip to content

Commit

Permalink
Merge pull request #6257 from ant-media/fix-missing-frames-in-hls
Browse files Browse the repository at this point in the history
Fix failing test on the cluster side
  • Loading branch information
mekya authored Apr 5, 2024
2 parents 2ee61e7 + 2f5e91f commit 1651c18
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 18 deletions.
11 changes: 8 additions & 3 deletions src/main/java/io/antmedia/muxer/Muxer.java
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,7 @@ public void init(IScope scope, final String name, int resolution, boolean overri
this.resolution = resolution;

//Refactor: Getting AppSettings smells here
IContext context = this.scope.getContext();
ApplicationContext appCtx = context.getApplicationContext();
AppSettings appSettings = (AppSettings) appCtx.getBean(AppSettings.BEAN_NAME);
AppSettings appSettings = getAppSettings();

initialResourceNameWithoutExtension = getExtendedName(name, resolution, bitrate, appSettings.getFileNameFormat());

Expand Down Expand Up @@ -671,6 +669,13 @@ public void init(IScope scope, final String name, int resolution, boolean overri

}
}

public AppSettings getAppSettings() {
IContext context = this.scope.getContext();
ApplicationContext appCtx = context.getApplicationContext();
return (AppSettings) appCtx.getBean(AppSettings.BEAN_NAME);
}

public String getExtendedName(String name, int resolution, int bitrate, String fileNameFormat){
// set default name
String resourceName = name;
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/io/antmedia/muxer/RecordMuxer.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,9 @@ public synchronized void writeTrailer() {
vertx.executeBlocking(()->{
try {

IContext context = RecordMuxer.this.scope.getContext();
ApplicationContext appCtx = context.getApplicationContext();
AntMediaApplicationAdapter adaptor = (AntMediaApplicationAdapter) appCtx.getBean(AntMediaApplicationAdapter.BEAN_NAME);
AntMediaApplicationAdapter adaptor = getAppAdaptor();

AppSettings appSettings = (AppSettings) appCtx.getBean(AppSettings.BEAN_NAME);
AppSettings appSettings = getAppSettings();

File f = getFinalFileName(appSettings.isS3RecordingEnabled());

Expand Down Expand Up @@ -161,6 +159,13 @@ public synchronized void writeTrailer() {

}

public AntMediaApplicationAdapter getAppAdaptor() {
IContext context = RecordMuxer.this.scope.getContext();
ApplicationContext appCtx = context.getApplicationContext();
AntMediaApplicationAdapter adaptor = (AntMediaApplicationAdapter) appCtx.getBean(AntMediaApplicationAdapter.BEAN_NAME);
return adaptor;
}


public static String getS3Prefix(String s3FolderPath, String subFolder) {
return replaceDoubleSlashesWithSingleSlash(s3FolderPath + File.separator + (subFolder != null ? subFolder : "" ) + File.separator);
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/io/antmedia/muxer/RtmpMuxer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
import static org.bytedeco.ffmpeg.global.avcodec.av_packet_ref;
import static org.bytedeco.ffmpeg.global.avcodec.av_packet_unref;
import static org.bytedeco.ffmpeg.global.avcodec.avcodec_parameters_copy;
import static org.bytedeco.ffmpeg.global.avformat.av_write_frame;
import static org.bytedeco.ffmpeg.global.avformat.av_interleaved_write_frame;
import static org.bytedeco.ffmpeg.global.avformat.avformat_alloc_output_context2;
import static org.bytedeco.ffmpeg.global.avutil.*;
import static org.bytedeco.ffmpeg.global.avutil.AVMEDIA_TYPE_AUDIO;
import static org.bytedeco.ffmpeg.global.avutil.AVMEDIA_TYPE_VIDEO;
import static org.bytedeco.ffmpeg.global.avutil.AV_ROUND_NEAR_INF;
import static org.bytedeco.ffmpeg.global.avutil.AV_ROUND_PASS_MINMAX;
import static org.bytedeco.ffmpeg.global.avutil.av_rescale_q;
import static org.bytedeco.ffmpeg.global.avutil.av_rescale_q_rnd;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -27,7 +32,6 @@
import org.bytedeco.ffmpeg.avcodec.AVPacket;
import org.bytedeco.ffmpeg.avformat.AVFormatContext;
import org.bytedeco.ffmpeg.avformat.AVStream;
import org.bytedeco.ffmpeg.avutil.AVDictionary;
import org.bytedeco.ffmpeg.avutil.AVRational;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
Expand Down Expand Up @@ -336,7 +340,8 @@ private synchronized void writeFrameInternal(AVPacket pkt, AVRational inputTimeb
}
else if (codecType == AVMEDIA_TYPE_AUDIO && headerWritten)
{
ret = av_write_frame(context, pkt);
av_packet_ref(getTmpPacket() , pkt);
ret = av_interleaved_write_frame(context, getTmpPacket());
if (ret < 0 && logger.isInfoEnabled())
{
setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_ERROR);
Expand All @@ -347,6 +352,7 @@ else if (codecType == AVMEDIA_TYPE_AUDIO && headerWritten)
logPacketIssue("Write audio packet for stream:{} and url:{}. Packet pts:{} dts:{}", streamId, getOutputURL(), pkt.pts(), pkt.dts());

}
av_packet_unref(getTmpPacket());
}

pkt.pts(pts);
Expand All @@ -356,14 +362,14 @@ else if (codecType == AVMEDIA_TYPE_AUDIO && headerWritten)
}

public void avWriteFrame(AVPacket pkt, AVFormatContext context) {
int ret;
int ret = 0;
boolean isKeyFrame = false;
if ((pkt.flags() & AV_PKT_FLAG_KEY) == 1) {
isKeyFrame = true;
}
addExtradataIfRequired(pkt, isKeyFrame);

ret = av_write_frame(context, getTmpPacket());
ret = av_interleaved_write_frame(context, getTmpPacket());
if (ret < 0 && logger.isInfoEnabled())
{
setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_ERROR);
Expand Down
20 changes: 18 additions & 2 deletions src/main/java/io/antmedia/rest/RestServiceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,8 @@ protected Result updateStreamSource(String streamId, Broadcast updatedBroadcast,
logger.debug("Updating stream source for stream {}", updatedBroadcast.getStreamId());

boolean isPlayList = AntMediaApplicationAdapter.PLAY_LIST.equals(broadcastInDB.getType());
if (!checkStreamUrl(updatedBroadcast.getStreamUrl()) && !isPlayList) {

if (StringUtils.isNotBlank(updatedBroadcast.getStreamUrl()) && !checkStreamUrl(updatedBroadcast.getStreamUrl()) && !isPlayList) {
return new Result(false, "Stream URL is not valid");
}

Expand All @@ -475,7 +476,22 @@ protected Result updateStreamSource(String streamId, Broadcast updatedBroadcast,
waitStopStreaming(broadcastInDB, resultStopStreaming);
}

if (AntMediaApplicationAdapter.IP_CAMERA.equals(updatedBroadcast.getType())) {
if (AntMediaApplicationAdapter.IP_CAMERA.equals(broadcastInDB.getType()) &&
!StringUtils.isAllBlank(updatedBroadcast.getIpAddr(), updatedBroadcast.getUsername(),updatedBroadcast.getPassword())) {

if (StringUtils.isBlank(updatedBroadcast.getIpAddr())) {
updatedBroadcast.setIpAddr(broadcastInDB.getIpAddr());
}

if (StringUtils.isBlank(updatedBroadcast.getUsername())) {
updatedBroadcast.setUsername(broadcastInDB.getUsername());
}

if (StringUtils.isBlank(updatedBroadcast.getPassword())) {
updatedBroadcast.setPassword(broadcastInDB.getPassword());
}


Result connectionRes = connectToCamera(updatedBroadcast);
if (!connectionRes.isSuccess()) {
return connectionRes;
Expand Down
10 changes: 7 additions & 3 deletions src/test/java/io/antmedia/test/MuxerUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import static org.bytedeco.ffmpeg.global.avformat.AVFMT_NOFILE;
import static org.bytedeco.ffmpeg.global.avformat.av_read_frame;
import static org.bytedeco.ffmpeg.global.avformat.av_stream_get_side_data;
import static org.bytedeco.ffmpeg.global.avformat.avformat_alloc_output_context2;
import static org.bytedeco.ffmpeg.global.avformat.*;
import static org.bytedeco.ffmpeg.global.avformat.avformat_close_input;
import static org.bytedeco.ffmpeg.global.avformat.avformat_find_stream_info;
import static org.bytedeco.ffmpeg.global.avformat.avformat_free_context;
Expand Down Expand Up @@ -1046,11 +1046,15 @@ public void testAVWriteFrame() {

RtmpMuxer rtmpMuxer = Mockito.spy(new RtmpMuxer(null, vertx));

AVFormatContext context = new AVFormatContext(null);
AVFormatContext context = new AVFormatContext();
int ret = avformat_alloc_output_context2(context, null, "flv", "test.flv");


//rtmpMuxer.set
AVPacket pkt = av_packet_alloc();

appScope = (WebScope) applicationContext.getBean("web.scope");

rtmpMuxer.init(appScope, "", 0, "", 0);

rtmpMuxer.avWriteFrame(pkt, context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2558,6 +2558,99 @@ public void testcheckStopStreaming()
assertFalse(streamSourceRest.checkStopStreaming(broadcast));
}

@Test
public void testUpdateIPCamera() {
Result result = new Result(false);

BroadcastRestService streamSourceRest = Mockito.spy(restServiceReal);

AppSettings settings = mock(AppSettings.class);
when(settings.getListenerHookURL()).thenReturn(null);
streamSourceRest.setAppSettings(settings);

AntMediaApplicationAdapter adaptor = mock (AntMediaApplicationAdapter.class);

ServerSettings serverSettings = Mockito.mock(ServerSettings.class);
streamSourceRest.setServerSettings(serverSettings);

Scope scope = mock(Scope.class);
String scopeName = "scope";
when(scope.getName()).thenReturn(scopeName);

streamSourceRest.setScope(scope);

Broadcast streamSource = new Broadcast("testIPcamera", "ipAddr", "username", "password",
null, AntMediaApplicationAdapter.IP_CAMERA);

streamSource.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_BROADCASTING);

StreamFetcher fetcher = mock(StreamFetcher.class);

try {
streamSource.setStreamId("test");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

InMemoryDataStore store = new InMemoryDataStore("test");

Mockito.doReturn(adaptor).when(streamSourceRest).getApplication();
Mockito.doReturn(new Result(true)).when(adaptor).startStreaming(streamSource);
Mockito.doReturn(store).when(streamSourceRest).getDataStore();
Mockito.doReturn(new Result(true, "rtsp://test")).when(streamSourceRest).connectToCamera(Mockito.any());

store.save(streamSource);

// Check Stream source update working normal.

Mockito.doReturn(true).when(streamSourceRest).checkStreamUrl(any());

Mockito.doReturn(true).when(streamSourceRest).checkStopStreaming(any());

result = streamSourceRest.updateBroadcast(streamSource.getStreamId(), streamSource);

assertEquals(true, result.isSuccess());
Mockito.verify(streamSourceRest).connectToCamera(Mockito.any());


Broadcast updateBroadcast = new Broadcast("testIPcamera", "", "", "",
"rtsp://test2", "");
try {
updateBroadcast.setStreamId("test");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
result = streamSourceRest.updateBroadcast(updateBroadcast.getStreamId(), updateBroadcast);

assertEquals(true, result.isSuccess());
Mockito.verify(streamSourceRest, Mockito.times(1)).connectToCamera(Mockito.any());

Broadcast broadcastInDB = store.get("test");
assertEquals("rtsp://test2", broadcastInDB.getStreamUrl());



updateBroadcast.setIpAddr("new ip");
updateBroadcast.setUsername("new user");
updateBroadcast.setPassword("");
result = streamSourceRest.updateBroadcast(updateBroadcast.getStreamId(), updateBroadcast);

assertEquals(true, result.isSuccess());
Mockito.verify(streamSourceRest, Mockito.times(2)).connectToCamera(Mockito.any());


updateBroadcast.setIpAddr("");
updateBroadcast.setUsername("");
updateBroadcast.setPassword("pass");
result = streamSourceRest.updateBroadcast(updateBroadcast.getStreamId(), updateBroadcast);
assertEquals(true, result.isSuccess());
Mockito.verify(streamSourceRest, Mockito.times(3)).connectToCamera(Mockito.any());


}

@Test
public void testUpdateStreamSource() {

Expand Down

0 comments on commit 1651c18

Please sign in to comment.