/**
* Copyright (c) 2006 - 2009 Smaxe Ltd (www.smaxe.com).
* All rights reserved.
*/
import com.smaxe.uv.client.ICamera;
import com.smaxe.uv.client.IMicrophone;
import com.smaxe.uv.client.rtmp.INetConnection;
import com.smaxe.uv.client.rtmp.INetStream;
import com.smaxe.uv.client.rtmp.IVideo;
import com.smaxe.uv.client.rtmp.video.AbstractVideo;
import com.smaxe.uv.stream.MediaData;
import com.smaxe.uv.stream.MediaDataFactory;
import java.io.IOException;
import java.util.Map;
/**
* <code>ExRtmpRetransmitter</code> - retrieves RTMP <code>sourceStream</code> from <code>sourceUrl</code> and
* publishes it to the <code>destinationUrl</code>.
*
* @author Andrei Sochirca
* @see <a href="http://www.smaxe.com/product.jsf?id=juv-rtmp-client" target="_blank">JUV RTMP Client</a>
* @see <a href="http://www.smaxe.com/product.jsf?id=juv-rtmfp-client" target="_blank">JUV RTMFP/RTMP Client</a>
*/
public final class ExRtmpRetransmitter extends Object
{
/**
* Entry point.
*
* @param args
* @throws Exception if an exception occurred
*/
public static void main(final String[] args) throws Exception
{
// NOTE:
// you can get Evaluation Key at:
// http://www.smaxe.com/order.jsf#request_evaluation_key
// or buy at:
// http://www.smaxe.com/order.jsf
// Android-specific:
// - please add permission to the AndroidManifest.xml :
// <uses-permission android:name="android.permission.INTERNET" />
// - please use separate thread to connect to the server (not UI thread) :
// NetConnection#connect() connects to the remote server in the invocation thread,
// so it causes NetworkOnMainThreadException on 4.0 (http://developer.android.com/reference/android/os/NetworkOnMainThreadException.html)
// Nginx-specific:
// - please use AMF0 encoding
com.smaxe.uv.client.rtmp.License.setKey("SET-YOUR-KEY");
final String sourceUrl = "rtmp://localhost:1935/live";
final String sourceStream = "livestream";
final String destinationUrl = "rtmp://localhost:1935/live";
final String destinationStream = "livestream2";
final AudioVideoStream avstream = new AudioVideoStream();
new Thread(new Runnable()
{
public void run()
{
final Player player = new Player();
player.play(new com.smaxe.uv.client.rtmp.NetConnection(), sourceUrl, sourceStream, avstream);
}
}, "ExRtmpRetransmitter-Player").start();
Thread.sleep(1000);
new Thread(new Runnable()
{
public void run()
{
final Publisher publisher = new Publisher();
publisher.publish(new com.smaxe.uv.client.rtmp.NetConnection(), destinationUrl, destinationStream, avstream, avstream);
}
}, "ExRtmpRetransmitter-Publisher").start();
}
/**
* <code>AudioVideoStream</code> - a/v stream.
*/
private static class AudioVideoStream extends AbstractVideo implements IMicrophone, ICamera
{
// fields
private IMicrophone.IListener microphoneListener = null;
private ICamera.IListener cameraListener = null;
private byte[] audioConfiguration = null;
private byte[] videoConfiguration = null;
private boolean active = false;
/**
* Constructor.
*/
public AudioVideoStream()
{
}
// IVideo implementation
@Override
public void onAudioData(final MediaData data)
{
if (microphoneListener == null)
{
final int tag = data.tag();
if (getAudioCodec(tag) == 0x0A) // AAC
{
byte[] id = new byte[2];
try
{
data.read().read(id);
}
catch (IOException e)
{
e.printStackTrace();
}
if (id[0] == (byte) 0xAF && id[1] == 0x00) // is config frame?
{
audioConfiguration = new byte[data.size()];
try
{
data.read().read(audioConfiguration);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
else
{
if (active)
{
microphoneListener.onAudioData(data);
}
}
}
@Override
public void onVideoData(final MediaData data)
{
if (cameraListener == null)
{
final int tag = data.tag();
if (getVideoCodec(tag) == 0x07) // AVC
{
byte[] id = new byte[2];
try
{
data.read().read(id);
}
catch (IOException e)
{
e.printStackTrace();
}
if (id[0] == 0x17 && id[1] == 0x00) // is config frame?
{
videoConfiguration = new byte[data.size()];
try
{
data.read().read(videoConfiguration);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
else
{
if (!active)
{
if (getVideoFrame(data.tag()) != 0x01) // is key frame?
{
return;
}
else
{
active = true;
}
}
cameraListener.onVideoData(data);
}
}
@Override
public void onFlvData(final MediaData data)
{
if (cameraListener != null)
{
cameraListener.onFlvData(data);
}
}
@Override
public void onCuePoint(final Object data)
{
System.out.println("onCuePoint: " + data);
}
@Override
public void onMetaData(final Object data)
{
System.out.println("onMetaData: " + data);
}
// IMicrophone implementation
public void addListener(final IMicrophone.IListener listener)
{
if (audioConfiguration != null)
{
listener.onAudioData(MediaDataFactory.create(0 /*rtime*/, audioConfiguration));
}
microphoneListener = listener;
}
public void removeListener(final IMicrophone.IListener listener)
{
microphoneListener = null;
}
// ICamera implementation
public void addListener(final ICamera.IListener listener)
{
if (videoConfiguration != null)
{
listener.onVideoData(MediaDataFactory.create(0 /*rtime*/, videoConfiguration));
}
cameraListener = listener;
}
public void removeListener(final ICamera.IListener listener)
{
cameraListener = new ICamera.ListenerAdapter();
}
// inner use methods
/**
* @param tag
* @return format encoded in the <code>tag</code>
*/
public int getAudioCodec(final int tag)
{
return (tag >> 4) & 0x0F;
}
/**
* @param tag
* @return codec encoded in the <code>tag</code>
*/
public int getVideoCodec(final int tag)
{
return tag & 0x0F;
}
/**
* @param tag
* @return frame encoded in the <code>tag</code>
*/
public int getVideoFrame(final int tag)
{
return (tag >> 4) & 0x0F;
}
}
/**
* <code>Player</code> - player.
*/
public static final class Player extends Object
{
/**
* <code>NetConnectionListener</code> - {@link INetConnection} listener implementation.
*/
public class NetConnectionListener extends INetConnection.ListenerAdapter
{
/**
* Constructor.
*/
public NetConnectionListener()
{
}
@Override
public void onAsyncError(final INetConnection source, final String message, final Exception e)
{
System.out.println("Player#NetConnection#onAsyncError: " + message + " " + e);
}
@Override
public void onIOError(final INetConnection source, final String message)
{
System.out.println("Player#NetConnection#onIOError: " + message);
}
@Override
public void onNetStatus(final INetConnection source, final Map<String, Object> info)
{
System.out.println("Player#NetConnection#onNetStatus: " + info);
final Object code = info.get("code");
if (INetConnection.CONNECT_SUCCESS.equals(code))
{
}
else
{
disconnected = true;
}
}
}
// fields
private volatile boolean disconnected = false;
/**
* Constructor.
*/
public Player()
{
}
/**
* Plays the stream.
*
* @param connection
* @param url
* @param streamName
* @param video
*/
public void play(final INetConnection connection, final String url, final String streamName, final IVideo video)
{
connection.addEventListener(new NetConnectionListener());
connection.connect(url);
// wait till connected
while (!connection.connected() && !disconnected)
{
try
{
Thread.sleep(100);
}
catch (Exception e) {/*ignore*/}
}
if (!disconnected)
{
INetStream stream = connection.createNetStream();
stream.addEventListener(new INetStream.ListenerAdapter()
{
@Override
public void onNetStatus(final INetStream source, final Map<String, Object> info)
{
System.out.println("Player#NetStream#onNetStatus: " + info);
}
});
stream.play(video, streamName);
}
while (!disconnected)
{
try
{
Thread.sleep(100);
}
catch (Exception e) {/*ignore*/}
}
connection.close();
}
}
/**
* <code>Publisher</code> - publisher.
*/
public static final class Publisher extends Object
{
/**
* <code>NetConnectionListener</code> - {@link INetConnection} listener implementation.
*/
private final class NetConnectionListener extends INetConnection.ListenerAdapter
{
/**
* Constructor.
*/
public NetConnectionListener()
{
}
@Override
public void onAsyncError(final INetConnection source, final String message, final Exception e)
{
System.out.println("Publisher#NetConnection#onAsyncError: " + message + " " + e);
}
@Override
public void onIOError(final INetConnection source, final String message)
{
System.out.println("Publisher#NetConnection#onIOError: " + message);
}
@Override
public void onNetStatus(final INetConnection source, final Map<String, Object> info)
{
System.out.println("Publisher#NetConnection#onNetStatus: " + info);
final Object code = info.get("code");
if (INetConnection.CONNECT_SUCCESS.equals(code))
{
}
else
{
disconnected = true;
}
}
}
// fields
private volatile boolean disconnected = false;
private INetStream stream = null;
/**
* Publishes the stream.
*
* @param connection
* @param url
* @param streamName
* @param microphone microphone
* @param camera camera
*/
public void publish(final INetConnection connection, final String url, final String streamName, final IMicrophone microphone, final ICamera camera)
{
connection.configuration().put(INetConnection.Configuration.INACTIVITY_TIMEOUT, -1);
connection.configuration().put(INetConnection.Configuration.IO_TIMEOUT, 20 /*milliseconds*/);
connection.configuration().put(INetConnection.Configuration.RECEIVE_BUFFER_SIZE, 256 * 1024);
connection.configuration().put(INetConnection.Configuration.SEND_BUFFER_SIZE, 256 * 1024);
connection.configuration().put(INetConnection.Configuration.ENABLE_MEDIA_STREAM_ABSOLUTE_TIMESTAMP, true);
connection.addEventListener(new NetConnectionListener());
connection.connect(url);
// wait till connected
while (!connection.connected() && !disconnected)
{
try
{
Thread.sleep(100);
}
catch (Exception e) {/*ignore*/}
}
if (!disconnected)
{
final INetStream stream = connection.createNetStream();
stream.addEventListener(new INetStream.ListenerAdapter()
{
@Override
public void onNetStatus(final INetStream source, final Map<String, Object> info)
{
System.out.println("Publisher#NetStream#onNetStatus: " + info);
final Object code = info.get("code");
if (INetStream.PUBLISH_START.equals(code))
{
if (microphone != null)
{
stream.attachAudio(microphone);
}
if (camera != null)
{
stream.attachCamera(camera, -1 /*snapshotMilliseconds*/);
}
}
}
});
stream.publish(streamName, INetStream.LIVE);
this.stream = stream;
}
while (!disconnected)
{
try
{
Thread.sleep(100);
}
catch (Exception e) {/*ignore*/}
}
connection.close();
}
/**
* Sends a message on the published stream to all subscribing clients.
*
* @param handler handler name
* @param args optional arguments that can be of any type
* @return <code>true</code> if sent; otherwise <code>false</code>
*/
public boolean sendStreamMessage(final String handler, final Object... args)
{
if (stream == null) return false;
stream.send(handler, args);
return true;
}
}
}