/**
* Copyright (c) 2006 - 2008 Smaxe Ltd (www.smaxe.com).
* All rights reserved.
*/
package com.smaxe.app.uv.downloader;
import com.smaxe.logger.ILogger;
import com.smaxe.logger.support.Loggers;
import com.smaxe.uv.UrlInfo;
import com.smaxe.uv.client.INetConnection;
import com.smaxe.uv.client.INetStream;
import com.smaxe.uv.client.NetConnection;
import com.smaxe.uv.client.NetStream;
import com.smaxe.uv.client.video.FlvVideo;
import com.smaxe.uv.communication.Protocol;
import com.smaxe.uv.rtmp.core.Status;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* <code>RtmpDownloader</code> - downloads audio/video stream from the RTMP/RTMPT-enabled
* server to the local FLV file.
*
* @author Andrei Sochirca
*/
public final class RtmpDownloader extends Object
{
/**
* <code>DownloadTask</code> - download task.
*/
private final class DownloadTask extends Object implements Future<Boolean>, Runnable
{
/**
* <code>NetConnectionListener</code> - {@link NetConnection} listener.
*/
private class NetConnectionListener extends NetConnection.ListenerAdapter
{
// fields
private FlvVideo flvVideo = null;
private Object stream = null;
/**
* Constructor.
*
* @param stream stream to play
* @param flvVideo file
*/
public NetConnectionListener(final Object stream, final FlvVideo flvVideo)
{
this.flvVideo = flvVideo;
this.stream = stream;
}
@Override
public void onAsyncError(final INetConnection source, final String message, final Exception e)
{
logger.log(ILogger.DEBUG, "NetConnection#onAsyncError: " + message, e);
disconnected = true;
}
@Override
public void onIOError(final INetConnection source, final String message)
{
logger.log(ILogger.DEBUG, "NetConnection#onIOError: " + message, null);
disconnected = true;
}
@Override
public void onNetStatus(final INetConnection source, final Map<String, Object> info)
{
logger.log(ILogger.DEBUG, "NetConnection#onNetStatus: " + info, null);
final Object code = info.get(Status.CODE);
if (NetConnection.CONNECT_SUCCESS.equals(code))
{
dispatcher.execute(new Runnable()
{
public void run()
{
final NetStream netStream = new NetStream(source);
netStream.addEventListener(new NetStream.ListenerAdapter()
{
@Override
public void onNetStatus(final INetStream source, final Map<String, Object> info)
{
logger.log(ILogger.DEBUG, "NetStream#onNetStatus: " + info, null);
final String code = (String) info.get(Status.CODE);
if (NetStream.PLAY_START.equals(code))
{
}
else
if (NetStream.PLAY_STOP.equals(code) || NetStream.UNPUBLISH_SUCCESS.equals(code))
{
netStream.close();
flvVideo.release();
result = Boolean.TRUE;
disconnected = true;
}
}
});
netStream.play(flvVideo, stream);
}
});
}
else
if (NetConnection.CONNECT_CLOSED.equals(code))
{
flvVideo.release();
disconnected = true;
}
else
{
result = new Exception((String) info.get(Status.DESCRIPTION));
disconnected = true;
}
}
}
// fields
private final String url;
private final Object[] args;
private final Map<String, Object> configuration;
private final Object stream;
private FlvVideo flvVideo = null;
private ExecutorService dispatcher = null;
// flags
private Object result = null;
private boolean disconnected = false;
private boolean cancelled = false;
/**
* Constructor.
*
* @param url server url
* @param args connection arguments
* @param configuration connection configuration
* @param stream stream to download
* @param file
* @throws Exception if an exception occured
*/
public DownloadTask(final String url, final Object[] args, final Map<String, Object> configuration,
final Object stream, final String file) throws Exception
{
this.url = url;
this.args = args;
this.configuration = configuration;
this.stream = stream;
this.flvVideo = new FlvVideo(file, 512 * 1024, true /*sync*/);
}
public boolean isCancelled()
{
return cancelled;
}
public boolean isDone()
{
return cancelled || result != null || disconnected;
}
public boolean cancel(final boolean mayInterruptIfRunning)
{
if (cancelled || disconnected) return false;
cancelled = true;
disconnected = true;
return true;
}
public Boolean get() throws InterruptedException, ExecutionException
{
while (!isDone())
{
Thread.sleep(100);
}
if (cancelled) throw new CancellationException();
if (result instanceof Boolean)
{
return (Boolean) result;
}
else
{
throw new ExecutionException((Exception) result);
}
}
public Boolean get(final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
{
final long ctime = System.currentTimeMillis();
final long delay = unit.convert(timeout, TimeUnit.MILLISECONDS);
while (!isDone())
{
Thread.sleep(100);
if (System.currentTimeMillis() - ctime >= delay) break;
}
if (cancelled) throw new CancellationException();
if (result == null) throw new TimeoutException();
if (result instanceof Boolean)
{
return (Boolean) result;
}
else
{
throw new ExecutionException((Exception) result);
}
}
public void run()
{
dispatcher = Executors.newSingleThreadExecutor();
final NetConnection connection = new NetConnection(configuration);
connection.addEventListener(new NetConnectionListener(stream, flvVideo));
UrlInfo info = UrlInfo.parseUrl(url);
if (info.protocol != null && info.protocol.indexOf('e') >= 0)
{
info = new UrlInfo(Protocol.RTMP, info.host, info.port, info.app, info.instance, info.scope, info.parameters);
}
connection.connect(info.toString(), args);
while (!isDone())
{
try
{
Thread.sleep(200);
}
catch (Exception e) {/*ignore*/}
}
connection.close();
dispatcher.shutdownNow();
}
}
// fields
private ILogger logger = null;
/**
* Constructor.
*/
public RtmpDownloader()
{
this.setLogger(null);
this.setDebugMode(false);
}
/**
* Set <code>true</code> for debug mode.
*
* @param debug set <code>true</code> to enable debug mode; <code>false</code> to disable
*/
public void setDebugMode(final boolean debug)
{
this.logger = debug ? Loggers.createSoLogger(ILogger.DEBUG, "Downloader") : Loggers.createNullLogger();
}
/**
* Sets the logger.
*
* @param logger logger
*/
private void setLogger(final ILogger logger)
{
this.logger = logger == null ? Loggers.createNullLogger() : logger;
}
/**
* Downloads <code>stream</code> from the <code>url</code> and
* saves it to the <code>file</code>.
* <p> Note: Connection configuration fields are:
* <br> "fpad" - (default: false)
* <br> "pageUrl" - page url (default: "")
* <br> "swfUrl" - SWF url (default: "")
* <br> "flashVer" - flash version (default: "WIN 9,0,124,0")
* <br> "audioCodecs" - audio codecs (default: 615)
* <br> "videoCodecs" - video codecs (default: 124)
* <br> "videoFunction" - video function (default: 1)
* <br>
* <br> <code>stream</code> parameter can be either String (name of the stream) or
* an array, please check developer guide for details.
*
* @param url connection url
* @param args connection arguments
* @param configuration connection configuration
* @param stream stream to record
* @param file local file to store stream
* @return <code>true</code> if succeeded to download; otherwise <code>false</code>
* @throws Exception if an exception occurred
*/
public Future<Boolean> download(final String url, final Object[] args, final Map<String, Object> configuration,
final Object stream, final String file) throws Exception
{
final DownloadTask task = new DownloadTask(url, args,
prepareConfiguration(configuration), stream, file);
new Thread(task).start();
return task;
}
/**
* Prepares <code>configuration</code>.
*
* @param configuration
* @return updated configuration
*/
private Map<String, Object> prepareConfiguration(final Map<String, Object> configuration)
{
Map<String, Object> conf = configuration == null ? new HashMap<String, Object>() : configuration;
// beans
conf.put(INetConnection.Configuration.LOGGER, logger);
//
setDefaultIfNotSet(conf, "IOTimeout", 75);
setDefaultIfNotSet(conf, INetConnection.Configuration.RECEIVE_BUFFER_SIZE, 128 * 1024);
setDefaultIfNotSet(conf, INetConnection.Configuration.SEND_BUFFER_SIZE, 16 * 1024);
setDefaultIfNotSet(conf, INetConnection.Configuration.STREAM_BUFFER_SIZE, 4 * 128 * 1024);
return conf;
}
/**
* Sets default value if the value is not set as parameter.
*
* @param configuration
* @param property
* @param defaultValue
*/
private void setDefaultIfNotSet(final Map<String, Object> configuration, final String property, final Object defaultValue)
{
if (configuration == null || configuration.get(property) != null) return;
configuration.put(property, defaultValue);
}
}