Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates rxjs to version 6. #8

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"make-dir": "^1.1.0",
"ramda": "^0.25.0",
"request": "^2.48.0",
"rxjs": "^5.5.4",
"rxjs": "^6.0.0",
"prettier": "^1.9.2"
}
}
69 changes: 33 additions & 36 deletions src/lib/observables/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
const { map, flatMap, delay: rxjsDelay, retryWhen, mergeMap, tap, reduce } = require('rxjs/operators');
// TODO: add writing to file (make this separate module?)
// TODO: there is a big bug with "takeWhile" it would discard all results in a
// concurrent chunk if first result happen to meet takeWhile condition

const P = require('bluebird')
const R = require('ramda')
const Rx = require('rxjs/Rx')
const Rx = require('rxjs')
const chalk = require('chalk')
const makeDir = require('make-dir')
const path = require('path')
Expand Down Expand Up @@ -86,34 +87,31 @@ const createScraper = ({
const fromUrlsIterator = async (urlsIterator, scrapeWhile) => {
const urlsIteratorSubject = new IteratorSubject(urlsIterator)

const scrapingPromise = urlsIteratorSubject
// convert URL that are just strings to "urlWithContext object"
.map(url => (R.type(url) === 'String' ? { url } : url))
// merge map allows to control concurrency and retries, produces multiple observables
// from single observable and merges them back to one
.mergeMap(
({ url }) =>
O.of(url)
.flatMap(url => httpGet(url))
.delay(getDelay())
.retryWhen(
httpError({
maxRetries: retryAttempts,
backoffMs: retryBackoffMs,
exponentialBackoff: exponentialRetryBackoff,
logProgress,
onFinalRetryFail: err => {
onProgress(err)
failedCount += 1
urlsIteratorSubject.next()
},
})
),
const scrapingPromise = urlsIteratorSubject.pipe(
map(url => (R.type(url) === 'String' ? { url } : url)),
mergeMap(({ url }) =>
Rx.of(url).pipe(
flatMap(url => httpGet(url)),
rxjsDelay(getDelay()),
retryWhen(
httpError({
maxRetries: retryAttempts,
backoffMs: retryBackoffMs,
exponentialBackoff: exponentialRetryBackoff,
logProgress,
onFinalRetryFail: err => {
onProgress(err)
failedCount += 1
urlsIteratorSubject.next()
},
})
)
),
// result selector, defines shape of the observable data passed further
(urlWithContext, response) => ({ response, urlWithContext }),
concurrency
)
.do(({ response, urlWithContext }) => {
),
tap(({ response, urlWithContext }) => {
// this is the meat of this approach, we notifiy RxIteratorSubject to emit next item
// which means "pull next item from iteratable" and since we subscribed to that very
// RxIteratorSubject we get an observable running that is capable of producing values
Expand All @@ -138,30 +136,29 @@ const createScraper = ({
)
}
}
})
.do(({ response, urlWithContext }) => {
}),
tap(({ response, urlWithContext }) => {
// increment success count only when scrape while is defined
if (scrapeWhile && scrapeWhile({ response })) {
successCount += 1
onProgress({ response, urlWithContext })
}
})
.map(scrapingFunc)
.reduce((results, currentResults) => {
}),
map(scrapingFunc),
reduce((results, currentResults) => {
if (!Array.isArray(results)) {
return results.concat([currentResults])
}

return results.concat(currentResults)
}, [])
// side-effects
.do(results => {
}, []),
tap(results => {
const totalCount = successCount + failedCount
if (totalCount === 1 || !logSummary) {
return
}
printSummary(results, successCount, failedCount)
})
}))
.toPromise()
.then(results =>
writeResultsToFile
Expand Down Expand Up @@ -195,7 +192,7 @@ const createScraper = ({
return urlsWithContext
? fromUrls(urlsWithContext)
: fromUrlsIterator(urlsIterator, scrapeWhile)
}
};
}

// creates iterator from arrray
Expand Down
2 changes: 1 addition & 1 deletion src/lib/observables/iterator-subject.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const Rx = require('rxjs/Rx')
const Rx = require('rxjs')

/**
* Respresents an Rx Subject that is tied to a provided iterator in it's constructor. Would
Expand Down
4 changes: 2 additions & 2 deletions src/lib/observables/utils/console-utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const chalk = require('chalk')
const Rx = require('rxjs/Rx')
const Rx = require('rxjs')

const log = data => console.dir(data, { colors: true, depth: 4 })
const logInfo = msg => console.log(chalk.cyan(msg))
Expand All @@ -11,7 +11,7 @@ const logHttpErrorForObservable = (err, caught) => {
console.log(chalk.red(err))
console.log(chalk.red('Response headers:'))
console.dir(err.response.headers, { colors: true, depth: 4 })
return Rx.Observable.empty()
return Rx.empty();
}

module.exports = {
Expand Down
17 changes: 9 additions & 8 deletions src/lib/observables/utils/rx-utils.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const Rx = require('rxjs/Rx')
const { zip, mergeMap } = require('rxjs/operators');
const Rx = require('rxjs')
const R = require('ramda')
const { get } = require('./ramda-utils')
const { log, getJSON } = require('../../console-tools')
Expand All @@ -23,16 +24,16 @@ const httpError = ({
}) => errorsObservable =>
// maxRetries + 1 is used so we can get into retry code after last attempt
// and fail accordingly
Rx.Observable.range(1, maxRetries + 1)
Rx.range(1, maxRetries + 1).pipe(
// combine errors observable (enhanced with logging) with range observable
.zip(errorsObservable, (i, err) => ({
zip(errorsObservable, (i, err) => ({
attempt: i,
retryAfterMs: retryAfterGetter(err) || backoffMs,
err,
}))
})),
// waiting for "inner" observable before re-trying "outer" one
// mergeMap is same as flatMap
.mergeMap(x => {
mergeMap(x => {
const retryAfter = exponentialBackoff ? x.retryAfterMs * x.attempt : x.retryAfterMs
const err = x.err
const totalAttempts = maxRetries + 1
Expand All @@ -54,7 +55,7 @@ const httpError = ({
// return Rx.Observable.of('hello')

onFinalRetryFail(err)
return Rx.Observable.empty()
return Rx.empty();
}

logProgress &&
Expand All @@ -64,8 +65,8 @@ const httpError = ({
`, retrying after ${retryAfter}ms`
)

return Rx.Observable.timer(retryAfter)
})
return Rx.timer(retryAfter);
}))

module.exports = {
httpError,
Expand Down