aboutsummaryrefslogtreecommitdiff
path: root/tuner/src/com/android/tv/tuner/source/TunerTsStreamer.java
diff options
context:
space:
mode:
Diffstat (limited to 'tuner/src/com/android/tv/tuner/source/TunerTsStreamer.java')
-rw-r--r--tuner/src/com/android/tv/tuner/source/TunerTsStreamer.java420
1 files changed, 420 insertions, 0 deletions
diff --git a/tuner/src/com/android/tv/tuner/source/TunerTsStreamer.java b/tuner/src/com/android/tv/tuner/source/TunerTsStreamer.java
new file mode 100644
index 00000000..21b7a1f8
--- /dev/null
+++ b/tuner/src/com/android/tv/tuner/source/TunerTsStreamer.java
@@ -0,0 +1,420 @@
+/*
+ * Copyright (C) 2015 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.tv.tuner.source;
+
+import android.content.Context;
+import android.util.Log;
+import android.util.Pair;
+import com.android.tv.common.SoftPreconditions;
+import com.android.tv.tuner.ChannelScanFileParser;
+import com.android.tv.tuner.TunerHal;
+import com.android.tv.tuner.TunerPreferences;
+import com.android.tv.tuner.data.TunerChannel;
+import com.android.tv.tuner.tvinput.EventDetector;
+import com.android.tv.tuner.tvinput.EventDetector.EventListener;
+import com.google.android.exoplayer.C;
+import com.google.android.exoplayer.upstream.DataSpec;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device. */
+public class TunerTsStreamer implements TsStreamer {
+ private static final String TAG = "TunerTsStreamer";
+
+ private static final int MIN_READ_UNIT = 1500;
+ private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB
+ private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000; // ~ 30MB
+ private static final int TS_PACKET_SIZE = 188;
+
+ private static final int READ_TIMEOUT_MS = 5000; // 5 secs.
+ private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
+ private static final int READ_ERROR_STREAMING_ENDED = -1;
+ private static final int READ_ERROR_BUFFER_OVERWRITTEN = -2;
+
+ private final Object mCircularBufferMonitor = new Object();
+ private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
+ private long mBytesFetched;
+ private final AtomicLong mLastReadPosition = new AtomicLong();
+ private boolean mStreaming;
+
+ private final TunerHal mTunerHal;
+ private TunerChannel mChannel;
+ private Thread mStreamingThread;
+ private final EventDetector mEventDetector;
+ private final List<Pair<EventListener, Boolean>> mEventListenerActions = new ArrayList<>();
+
+ private final TsStreamWriter mTsStreamWriter;
+ private String mChannelNumber;
+
+ public static class TunerDataSource extends TsDataSource {
+ private final TunerTsStreamer mTsStreamer;
+ private final AtomicLong mLastReadPosition = new AtomicLong(0);
+ private long mStartBufferedPosition;
+
+ private TunerDataSource(TunerTsStreamer tsStreamer) {
+ mTsStreamer = tsStreamer;
+ mStartBufferedPosition = tsStreamer.getBufferedPosition();
+ }
+
+ @Override
+ public long getBufferedPosition() {
+ return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
+ }
+
+ @Override
+ public long getLastReadPosition() {
+ return mLastReadPosition.get();
+ }
+
+ @Override
+ public void shiftStartPosition(long offset) {
+ SoftPreconditions.checkState(mLastReadPosition.get() == 0);
+ SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
+ mStartBufferedPosition += offset;
+ }
+
+ @Override
+ public long open(DataSpec dataSpec) throws IOException {
+ mLastReadPosition.set(0);
+ return C.LENGTH_UNBOUNDED;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public int read(byte[] buffer, int offset, int readLength) throws IOException {
+ int ret =
+ mTsStreamer.readAt(
+ mStartBufferedPosition + mLastReadPosition.get(),
+ buffer,
+ offset,
+ readLength);
+ if (ret > 0) {
+ mLastReadPosition.addAndGet(ret);
+ } else if (ret == READ_ERROR_BUFFER_OVERWRITTEN) {
+ long currentPosition = mStartBufferedPosition + mLastReadPosition.get();
+ long endPosition = mTsStreamer.getBufferedPosition();
+ long diff =
+ ((endPosition - currentPosition + TS_PACKET_SIZE - 1) / TS_PACKET_SIZE)
+ * TS_PACKET_SIZE;
+ Log.w(TAG, "Demux position jump by overwritten buffer: " + diff);
+ mStartBufferedPosition = currentPosition + diff;
+ mLastReadPosition.set(0);
+ return 0;
+ }
+ return ret;
+ }
+ }
+ /**
+ * Creates {@link TsStreamer} for playing or recording the specified channel.
+ *
+ * @param tunerHal the HAL for tuner device
+ * @param eventListener the listener for channel & program information
+ */
+ public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context) {
+ mTunerHal = tunerHal;
+ mEventDetector = new EventDetector(mTunerHal);
+ if (eventListener != null) {
+ mEventDetector.registerListener(eventListener);
+ }
+ mTsStreamWriter =
+ context != null && TunerPreferences.getStoreTsStream(context)
+ ? new TsStreamWriter(context)
+ : null;
+ }
+
+ public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener) {
+ this(tunerHal, eventListener, null);
+ }
+
+ @Override
+ public boolean startStream(TunerChannel channel) {
+ if (mTunerHal.tune(
+ channel.getFrequency(), channel.getModulation(), channel.getDisplayNumber(false))) {
+ if (channel.hasVideo()) {
+ mTunerHal.addPidFilter(channel.getVideoPid(), TunerHal.FILTER_TYPE_VIDEO);
+ }
+ boolean audioFilterSet = false;
+ for (Integer audioPid : channel.getAudioPids()) {
+ if (!audioFilterSet) {
+ mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_AUDIO);
+ audioFilterSet = true;
+ } else {
+ // FILTER_TYPE_AUDIO overrides the previous filter for audio. We use
+ // FILTER_TYPE_OTHER from the secondary one to get the all audio tracks.
+ mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_OTHER);
+ }
+ }
+ mTunerHal.addPidFilter(channel.getPcrPid(), TunerHal.FILTER_TYPE_PCR);
+ if (mEventDetector != null) {
+ mEventDetector.startDetecting(
+ channel.getFrequency(),
+ channel.getModulation(),
+ channel.getProgramNumber());
+ }
+ mChannel = channel;
+ mChannelNumber = channel.getDisplayNumber();
+ synchronized (mCircularBufferMonitor) {
+ if (mStreaming) {
+ Log.w(TAG, "Streaming should be stopped before start streaming");
+ return true;
+ }
+ mStreaming = true;
+ mBytesFetched = 0;
+ mLastReadPosition.set(0L);
+ }
+ if (mTsStreamWriter != null) {
+ mTsStreamWriter.setChannel(mChannel);
+ mTsStreamWriter.openFile();
+ }
+ mStreamingThread = new StreamingThread();
+ mStreamingThread.start();
+ Log.i(TAG, "Streaming started");
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean startStream(ChannelScanFileParser.ScanChannel channel) {
+ if (mTunerHal.tune(channel.frequency, channel.modulation, null)) {
+ mEventDetector.startDetecting(
+ channel.frequency, channel.modulation, EventDetector.ALL_PROGRAM_NUMBERS);
+ synchronized (mCircularBufferMonitor) {
+ if (mStreaming) {
+ Log.w(TAG, "Streaming should be stopped before start streaming");
+ return true;
+ }
+ mStreaming = true;
+ mBytesFetched = 0;
+ mLastReadPosition.set(0L);
+ }
+ mStreamingThread = new StreamingThread();
+ mStreamingThread.start();
+ Log.i(TAG, "Streaming started");
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
+ * device is overloaded this can take a while, but usually it returns pretty quickly.
+ */
+ @Override
+ public void stopStream() {
+ mChannel = null;
+ synchronized (mCircularBufferMonitor) {
+ mStreaming = false;
+ mCircularBufferMonitor.notifyAll();
+ }
+
+ try {
+ if (mStreamingThread != null) {
+ mStreamingThread.join();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (mTsStreamWriter != null) {
+ mTsStreamWriter.closeFile(true);
+ mTsStreamWriter.setChannel(null);
+ }
+ }
+
+ @Override
+ public TsDataSource createDataSource() {
+ return new TunerDataSource(this);
+ }
+
+ /**
+ * Returns incomplete channel lists which was scanned so far. Incomplete channel means the
+ * channel whose channel information is not complete or is not well-formed.
+ *
+ * @return {@link List} of {@link TunerChannel}
+ */
+ public List<TunerChannel> getMalFormedChannels() {
+ return mEventDetector.getMalFormedChannels();
+ }
+
+ /**
+ * Returns the current {@link TunerHal} which provides MPEG-TS stream for TunerTsStreamer.
+ *
+ * @return {@link TunerHal}
+ */
+ public TunerHal getTunerHal() {
+ return mTunerHal;
+ }
+
+ /**
+ * Returns the current tuned channel for TunerTsStreamer.
+ *
+ * @return {@link TunerChannel}
+ */
+ public TunerChannel getChannel() {
+ return mChannel;
+ }
+
+ /**
+ * Returns the current buffered position from tuner.
+ *
+ * @return the current buffered position
+ */
+ public long getBufferedPosition() {
+ synchronized (mCircularBufferMonitor) {
+ return mBytesFetched;
+ }
+ }
+
+ public String getStreamerInfo() {
+ return "Channel: " + mChannelNumber + ", Streaming: " + mStreaming;
+ }
+
+ public void registerListener(EventListener listener) {
+ if (mEventDetector != null && listener != null) {
+ synchronized (mEventListenerActions) {
+ mEventListenerActions.add(new Pair<>(listener, true));
+ }
+ }
+ }
+
+ public void unregisterListener(EventListener listener) {
+ if (mEventDetector != null) {
+ synchronized (mEventListenerActions) {
+ mEventListenerActions.add(new Pair(listener, false));
+ }
+ }
+ }
+
+ private class StreamingThread extends Thread {
+ @Override
+ public void run() {
+ // Buffers for streaming data from the tuner and the internal buffer.
+ byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
+
+ while (true) {
+ synchronized (mCircularBufferMonitor) {
+ if (!mStreaming) {
+ break;
+ }
+ }
+
+ if (mEventDetector != null) {
+ synchronized (mEventListenerActions) {
+ for (Pair listenerAction : mEventListenerActions) {
+ EventListener listener = (EventListener) listenerAction.first;
+ if ((boolean) listenerAction.second) {
+ mEventDetector.registerListener(listener);
+ } else {
+ mEventDetector.unregisterListener(listener);
+ }
+ }
+ mEventListenerActions.clear();
+ }
+ }
+
+ int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length);
+ if (bytesWritten <= 0) {
+ try {
+ // When buffer is underrun, we sleep for short time to prevent
+ // unnecessary CPU draining.
+ sleep(BUFFER_UNDERRUN_SLEEP_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ }
+
+ if (mTsStreamWriter != null) {
+ mTsStreamWriter.writeToFile(dataBuffer, bytesWritten);
+ }
+
+ if (mEventDetector != null) {
+ mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
+ }
+ synchronized (mCircularBufferMonitor) {
+ int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
+ int bytesToCopyInFirstPass = bytesWritten;
+ if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
+ bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
+ }
+ System.arraycopy(
+ dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass);
+ if (bytesToCopyInFirstPass < bytesWritten) {
+ System.arraycopy(
+ dataBuffer,
+ bytesToCopyInFirstPass,
+ mCircularBuffer,
+ 0,
+ bytesWritten - bytesToCopyInFirstPass);
+ }
+ mBytesFetched += bytesWritten;
+ mCircularBufferMonitor.notifyAll();
+ }
+ }
+
+ Log.i(TAG, "Streaming stopped");
+ }
+ }
+
+ /**
+ * Reads data from internal buffer.
+ *
+ * @param pos the position to read from
+ * @param buffer to read
+ * @param offset start position of the read buffer
+ * @param amount number of bytes to read
+ * @return number of read bytes when successful, {@code -1} otherwise
+ * @throws IOException
+ */
+ public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
+ while (true) {
+ synchronized (mCircularBufferMonitor) {
+ if (!mStreaming) {
+ return READ_ERROR_STREAMING_ENDED;
+ }
+ if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
+ Log.w(TAG, "Demux is requesting the data which is already overwritten.");
+ return READ_ERROR_BUFFER_OVERWRITTEN;
+ }
+ if (mBytesFetched < pos + amount) {
+ try {
+ mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ // Try again to prevent starvation.
+ // Give chances to read from other threads.
+ continue;
+ }
+ int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE);
+ int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE);
+ int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos;
+ System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength);
+ if (firstLength < amount) {
+ System.arraycopy(
+ mCircularBuffer, 0, buffer, offset + firstLength, amount - firstLength);
+ }
+ mCircularBufferMonitor.notifyAll();
+ return amount;
+ }
+ }
+ }
+}