Skip to content

Commit

Permalink
#39, #34 Add option on stream creation to avoid closing the stream wh…
Browse files Browse the repository at this point in the history
…en an audio error occurs
  • Loading branch information
scriptorian committed Jan 10, 2020
1 parent d85f947 commit 2f937b7
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 49 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ Playing audio involves streaming audio data to a new instance of `AudioIO` confi
const fs = require('fs');
const portAudio = require('naudiodon');

// Create an instance of AudioIO with outOptions, which will return a WritableStream
// Create an instance of AudioIO with outOptions (defaults are as below), which will return a WritableStream
var ao = new portAudio.AudioIO({
outOptions: {
channelCount: 2,
sampleFormat: portAudio.SampleFormat16Bit,
sampleRate: 48000,
deviceId: -1 // Use -1 or omit the deviceId to select the default device
deviceId: -1, // Use -1 or omit the deviceId to select the default device
closeOnError: true // Close the stream if an audio error is detected, if set false then just log the error
}
});

Expand All @@ -131,13 +132,14 @@ Recording audio involves streaming audio data from a new instance of `AudioIO` c
var fs = require('fs');
var portAudio = require('../index.js');

// Create an instance of AudioIO with inOptions, which will return a ReadableStream
// Create an instance of AudioIO with inOptions (defaults are as below), which will return a ReadableStream
var ai = new portAudio.AudioIO({
inOptions: {
channelCount: 2,
sampleFormat: portAudio.SampleFormat16Bit,
sampleRate: 44100,
deviceId: -1 // Use -1 or omit the deviceId to select the default device
deviceId: -1, // Use -1 or omit the deviceId to select the default device
closeOnError: true // Close the stream if an audio error is detected, if set false then just log the error
}
});

Expand Down
31 changes: 11 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,19 @@ function AudioIO(options) {
let ioStream;

const doRead = size => {
audioIOAdon.read(size, (err, buf) => {
audioIOAdon.read(size, (err, buf, finished) => {
if (err)
process.nextTick(() => ioStream.emit('error', err));

ioStream.push(buf);
if (buf && buf.length < size)
ioStream.push(null);
ioStream.destroy(err);
else {
ioStream.push(buf);
if (finished)
ioStream.push(null);
}
});
};

const doWrite = (chunk, encoding, cb) => {
audioIOAdon.write(chunk, err => {
if (err)
process.nextTick(() => ioStream.emit('error', err));
cb();
});
audioIOAdon.write(chunk, err => cb(err));
}

const readable = 'inOptions' in options;
Expand Down Expand Up @@ -94,15 +91,9 @@ function AudioIO(options) {
});
}

ioStream.on('close', () => {
console.log('AudioIO close');
ioStream.quit();
});
ioStream.on('finish', () => {
console.log('AudioIO finish');
ioStream.quit();
});
ioStream.on('end', () => console.log('AudioIO end'));
ioStream.on('close', () => ioStream.quit());
ioStream.on('finish', () => ioStream.quit());

ioStream.on('error', err => console.error('AudioIO:', err));

return ioStream;
Expand Down
6 changes: 4 additions & 2 deletions scratch/smokeLoop.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ var ai = new portAudio.AudioIO({
channelCount: 2,
sampleFormat: portAudio.SampleFormat16Bit,
sampleRate: sampleRate,
deviceId: 1
deviceId: 2,
closeOnError: false
}
});

Expand All @@ -33,7 +34,8 @@ var ao = new portAudio.AudioIO({
channelCount: 2,
sampleFormat: portAudio.SampleFormat16Bit,
sampleRate: sampleRate,
deviceId: -1
deviceId: -1,
closeOnError: false
}
});

Expand Down
24 changes: 14 additions & 10 deletions src/AudioIO.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,36 @@ static class AllocFinalizer {
class ReadWorker : public Napi::AsyncWorker {
public:
ReadWorker(std::shared_ptr<PaContext> paContext, uint32_t numBytes, const Napi::Function& callback)
: AsyncWorker(callback, "AudioRead"), mPaContext(paContext), mNumBytes(numBytes)
: AsyncWorker(callback, "AudioRead"), mPaContext(paContext), mNumBytes(numBytes), mFinished(false)
{ }
~ReadWorker() {}

void Execute() {
mChunk = mPaContext->pullInChunk(mNumBytes);
mChunk = mPaContext->pullInChunk(mNumBytes, mFinished);
}

void OnOK() {
Napi::HandleScope scope(Env());

Napi::Value errVal = Env().Null();
Napi::Value bufVal = Env().Null();
std::string errStr;
if (mPaContext->getErrStr(errStr))
Callback().Call({Napi::String::New(Env(), errStr), Napi::Buffer<uint8_t>::New(Env(),0)});
else if (mChunk) {
if (mPaContext->getErrStr(errStr, /*isInput*/true))
errVal = Napi::String::New(Env(), errStr);
if (mChunk && mChunk->numBytes()) {
sOutstandingAllocs.emplace(mChunk->buf(), mChunk);
Napi::Object buf = Napi::Buffer<uint8_t>::New(Env(), mChunk->buf(), mChunk->numBytes(), sAllocFinalizer);
Callback().Call({Env().Null(), buf});
} else
Callback().Call({Env().Null(), Env().Null()});
bufVal = Napi::Buffer<uint8_t>::New(Env(), mChunk->buf(), mChunk->numBytes(), sAllocFinalizer);
}
Napi::Boolean finishedVal = Napi::Boolean::New(Env(), mFinished);

Callback().Call({errVal, bufVal, finishedVal});
}

private:
std::shared_ptr<PaContext> mPaContext;
uint32_t mNumBytes;
std::shared_ptr<Chunk> mChunk;
bool mFinished;
};

class WriteWorker : public Napi::AsyncWorker {
Expand All @@ -73,7 +77,7 @@ class WriteWorker : public Napi::AsyncWorker {
void OnOK() {
Napi::HandleScope scope(Env());
std::string errStr;
if (mPaContext->getErrStr(errStr))
if (mPaContext->getErrStr(errStr, /*isInput*/false))
Callback().Call({Napi::String::New(Env(), errStr)});
else
Callback().Call({Env().Null()});
Expand Down
20 changes: 12 additions & 8 deletions src/PaContext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ void PaContext::stop(eStopFlag flag) {
Pa_Terminate();
}

std::shared_ptr<Chunk> PaContext::pullInChunk(uint32_t numBytes) {
std::shared_ptr<Chunk> PaContext::pullInChunk(uint32_t numBytes, bool &finished) {
std::shared_ptr<Memory> result = Memory::makeNew(numBytes);
bool finished = false;
uint32_t bytesRead = fillBuffer(result->buf(), numBytes, mInChunks, finished);
finished = false;
uint32_t bytesRead = fillBuffer(result->buf(), numBytes, mInChunks, finished, /*isInput*/true);
if (bytesRead != numBytes) {
if (0 == bytesRead)
result = std::shared_ptr<Memory>();
Expand Down Expand Up @@ -152,9 +152,13 @@ void PaContext::checkStatus(uint32_t statusFlags) {
}
}

bool PaContext::getErrStr(std::string& errStr) {
bool PaContext::getErrStr(std::string& errStr, bool isInput) {
std::lock_guard<std::mutex> lk(m);
errStr = mErrStr;
std::shared_ptr<streampunk::AudioOptions> options = isInput ? mInOptions : mOutOptions;
if (options->closeOnError()) // propagate the error back to the stream handler
errStr = mErrStr;
else if (mErrStr.length())
printf("AudioIO: %s\n", mErrStr.c_str());

This comment has been minimized.

Copy link
@psnyder

psnyder Feb 28, 2020

I wish this was going to stderr instead or had an option to ignore and NOT log. Then I could redirect errors to file, and it wouldn't mess up my terminal UI.

mErrStr.clear();
return !errStr.empty();
}
Expand All @@ -177,20 +181,20 @@ bool PaContext::readPaBuffer(const void *srcBuf, uint32_t frameCount) {
bool PaContext::fillPaBuffer(void *dstBuf, uint32_t frameCount) {
uint32_t bytesRemaining = frameCount * mOutOptions->channelCount() * mOutOptions->sampleBits() / 8;
bool finished = false;
fillBuffer((uint8_t *)dstBuf, bytesRemaining, mOutChunks, finished);
fillBuffer((uint8_t *)dstBuf, bytesRemaining, mOutChunks, finished, /*isInput*/false);
return !finished;
}

// private
uint32_t PaContext::fillBuffer(uint8_t *buf, uint32_t numBytes,
std::shared_ptr<Chunks> chunks,
bool &finished) {
bool &finished, bool isInput) {
uint32_t bufOff = 0;
while (numBytes) {
if (!chunks->curBuf() || (chunks->curBuf() && (chunks->curBytes() == chunks->curOffset()))) {
chunks->waitNext();
if (!chunks->curBuf()) {
printf("Finishing - %d bytes not available to fill the last buffer\n", numBytes);
printf("Finishing %s - %d bytes not available to fill the last buffer\n", isInput ? "input" : "output", numBytes);
memset(buf + bufOff, 0, numBytes);
finished = true;
break;
Expand Down
6 changes: 3 additions & 3 deletions src/PaContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ class PaContext {
void start(Napi::Env env);
void stop(eStopFlag flag);

std::shared_ptr<Chunk> pullInChunk(uint32_t numBytes);
std::shared_ptr<Chunk> pullInChunk(uint32_t numBytes, bool &finished);
void pushOutChunk(std::shared_ptr<Chunk> chunk);

void checkStatus(uint32_t statusFlags);
bool getErrStr(std::string& errStr);
bool getErrStr(std::string& errStr, bool isInput);

void quit();

Expand All @@ -63,7 +63,7 @@ class PaContext {

uint32_t fillBuffer(uint8_t *buf, uint32_t numBytes,
std::shared_ptr<Chunks> chunks,
bool &finished);
bool &finished, bool isInput);

void setParams(Napi::Env env, bool isInput,
std::shared_ptr<AudioOptions> options,
Expand Down
8 changes: 6 additions & 2 deletions src/Params.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class AudioOptions : public Params {
mChannelCount(unpackNum(env, tags, "channelCount", 2)),
mSampleFormat(unpackNum(env, tags, "sampleFormat", 8)),
mSampleBits(1 == mSampleFormat ? 32 : mSampleFormat),
mMaxQueue(unpackNum(env, tags, "maxQueue", 2))
mMaxQueue(unpackNum(env, tags, "maxQueue", 2)),
mCloseOnError(unpackBool(env, tags, "closeOnError", true))
{}
~AudioOptions() {}

Expand All @@ -83,6 +84,7 @@ class AudioOptions : public Params {
uint32_t sampleFormat() const { return mSampleFormat; }
uint32_t sampleBits() const { return mSampleBits; }
uint32_t maxQueue() const { return mMaxQueue; }
bool closeOnError() const { return mCloseOnError; }

std::string toString() const {
std::stringstream ss;
Expand All @@ -94,7 +96,8 @@ class AudioOptions : public Params {
ss << "sample rate " << mSampleRate << ", ";
ss << "channels " << mChannelCount << ", ";
ss << "bits per sample " << mSampleBits << ", ";
ss << "max queue " << mMaxQueue;
ss << "max queue " << mMaxQueue << ", ";
ss << "close on error " << (mCloseOnError ? "true" : "false");
return ss.str();
}

Expand All @@ -105,6 +108,7 @@ class AudioOptions : public Params {
uint32_t mSampleFormat;
uint32_t mSampleBits;
uint32_t mMaxQueue;
bool mCloseOnError;
};

} // namespace streampunk
Expand Down

0 comments on commit 2f937b7

Please sign in to comment.