Crawling paginated json files

Hi, I have a json feed that have manged to index and create a collection from using the filter class JSONtoXML.

The json feed is paginated and I need to get the crawl to go to the next page, I have a pagination next link set in the raw json data:

 `"pagination":{"next":"http:\/\/example.com/feed?page=2"}` 

I have tried to use the following:
crawler.link_extraction_group=1 crawler.link_extraction_regular_expression="next":"(.+?)"
to retrieve the next page link but have hit a wall, there is no error when running the crawl but its not going past the submitted page of the feed. Does the filter class get run before the the crawler classes are run should the regex be looking for XML formatted tags instead?

Howdy jdentice

Unfortunately, Funnelback only extract links from HTML documents.

For JSON endpoints which require pagination, I would recommend going down the path of creating a custom collection as it would provide you with greater control over the interactions. Below is an example for stack overflow that I am working on for an internal project:

JSON endpoint: Usage of /posts [GET] - Stack Exchange API

custom_gather.groovy

package com.funnelback.internal

import com.funnelback.common.*
import com.funnelback.common.config.*
import com.funnelback.common.io.store.*
import com.funnelback.common.io.store.bytes.*
import com.funnelback.common.views.StoreView
import com.google.common.collect.*
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.logging.log4j.LogManager

import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream

def logger = org.apache.logging.log4j.LogManager.getLogger("com.funnelback.internal.custom_gather.stack_overflow")

def final String CONFIG_PREFIX = "custom_gather.stack_overflow"
def final int DEFAULT_MAX_STORED = 1000000

def final String POST_API_PATH = "/2.2/posts"

if (args.length < 2) {
	println "Usage: script $SEARCH_HOME collection-name"
	return 1
}

def searchHome = new File(args[0])
def collectionName = args[1]

// Create a configuration object to read collection.cfg
def config = new NoOptionsConfig(searchHome, collectionName)

// Time between requests
def requestDelay = config.valueAsInt("gather.request_delay", 50)
def connectionTimeout = config.valueAsInt(CONFIG_PREFIX + ".connection.timeout", 60000)

// Maximum number of records to store
def maxStored = config.valueAsInt(CONFIG_PREFIX + ".max_stored", DEFAULT_MAX_STORED)

// Configurations for customising the question API
def stackOverflowUrl = config.value(CONFIG_PREFIX + ".stack_overflow_url", "https://api.stackexchange.com") 
def teamUrl = config.value(CONFIG_PREFIX + ".team_url", "")
def siteName = config.value(CONFIG_PREFIX + ".site_name", "stackoverflow")
def appKey = config.value(CONFIG_PREFIX + ".key", "")
def filter = config.value(CONFIG_PREFIX + ".filter", "withbody")
def pageSize = config.valueAsInt(CONFIG_PREFIX + ".page_size", 20)

// Authentication to access the stack overflow APIs
def accessToken = config.value(CONFIG_PREFIX + ".access_token", "")

// Create a Store instance to store gathered data in Funnelback
def store = new RawBytesStoreFactory(config)
	.withFilteringEnabled(true)
	.newStore()

store.open()

// Metadata associated with the document
def metadata = ArrayListMultimap.create()
metadata.put("Content-Type", "application/json")

// Create the URL using the information from collection.cfg 
def base = stackOverflowUrl + POST_API_PATH + "?"

def params = [
	"order" : "desc",
	"sort" : "activity",
	"site" : siteName,
	"key" : appKey,
	"filter" :  filter,
	"team" : teamUrl,
	"pagesize" : pageSize
]

def basePostUrl =  base + params
	.findAll{ k,v -> v != "" }
	.collect { k,v -> "$k=$v" }.join('&')

config.progressMessage = "Fetching questions and answers from '${basePostUrl}'"

println config.progressMessage

// Setup various loop variables to control when the script to 
// finish
def hasNext = true
def stored = 0
def currentPage = 1

try {
	while(hasNext && !config.updateStopped && stored < maxStored) {

		// Construct the url for the current page
		def currentUrl = new URL(basePostUrl + "&page=${currentPage++}") 

		// Get the data from stackoveflow. Note that data from their APIs are gzipped
		def postData = getJSONFromGzipSource(currentUrl, connectionTimeout, accessToken)

		// Get the details of each item in stackover and add it to the Funnelback store
		postData["items"].each {
			// Add item to the store
			item  ->

			def url = item.link
			def record = new RawBytesRecord(
				JsonOutput.toJson(item).getBytes("UTF-8"),
				url as String)
				store.add(record, metadata)
			println "  Stored ${url}"
			
			stored++

			// Sets a delay between connections. Useful to prevent exceeding the 
			// API rate limit
			if (requestDelay > 0) {
				// println "Sleeping for ${requestDelay}ms (gather.request_delay)"
				Thread.sleep(requestDelay)
			}
		}

		def total = postData.total
		def quotaMax = postData.quota_max
		def quotaRemaining = postData.quota_remaining

		println "Quota remaining: ${quotaRemaining}/${quotaMax}"

		// Update the UI with the current progress
		if(stored % 100 == 0) {
			config.progressMessage = "Fetched ${stored}/${total} questions and answers"
			println config.progressMessage
		}
 
		hasNext = (stored < total) ? true : false  
	}
}
catch (Exception e){
	println e.message
	println e.toString()
} 
finally {
	store.close()
}

println "Fetched ${stored} questions and answers from stackoverflow"
logger.info("Fetched ${stored} questions and answers from stackoverflow")

/**
	* Helper functions for the script 
  */ 
// Creates a json object from a url
def getJSONFromGzipSource(def url, def timeout, def accessToken = "") {
	println "Contacting ${url} …"
	def cnx = url.openConnection()
	cnx.setRequestProperty("Accept-Charset", "UTF-8");
	cnx.setRequestProperty("Accept", "application/json")
	cnx.setRequestProperty("access_token", accessToken)
	cnx.setRequestProperty("X-API-Access-Token", accessToken)
	cnx.setConnectTimeout(timeout)
	cnx.setReadTimeout(timeout)

	cnx.connect()

	// Content from stack overflow is gzipped. We must unzip it first
	// before we can use it
	def unzip = new GZIPInputStream(cnx.inputStream)

	// Get the data from the API
	return new JsonSlurper().parse(unzip)
}

collection.cfg:

admin_email=gtran@funnelback.com
collection=onesearch-squiz-stack-overflow-custom
collection_type=custom
custom_gather.stack_overflow.team=https://forums.funnelback.com
custom_gather.stack_overflow.max_stored=500
custom_gather.stack_overflow.team_url=stackoverflow.com%2Fc%2Fsquiz%0A
custom_gather.stack_overflow.site_name=stackoverflow
custom_gather.stack_overflow.key=<key>
custom_gather.stack_overflow.filter=!3ykawIdPoxO)fKIi4
custom_gather.stack_overflow.access_token=<access_token>
data_report=false
filter=true
filter.classes=JSONToXML
query_processor_options=-stem=2 -weight_only_fields=[K] -SF=[.*]
gather=custom-gather
group.project_id=OneSearch
service_name=OneSearch - Squiz - Stack Overflow
start_url=
store.record.type=RawBytesRecord

Handy Tip
If you are running Funnelback from a vagrant machine, you can test custom gather scripts using the following command:

/opt/funnelback/tools/groovy/bin/groovy -cp "/opt/funnelback/lib/java/all/*" "/opt/funnelback/conf/<collection_name>/custom_gather.groovy" $SEARCH_HOME <collection_name>

Hope this helps.

~Gioan

Here’s another script that we use to crawl jira apis:

package com.funnelback.internal

import com.funnelback.common.*
import com.funnelback.common.config.*
import com.funnelback.common.io.store.*
import com.funnelback.common.io.store.bytes.*
import com.funnelback.common.views.StoreView
import com.google.common.collect.*
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.logging.log4j.LogManager;

def final String CONFIG_PREFIX = "custom_gather.jira"
def final int DEFAULT_MAX_STORED = 1000000

if (args.length < 2) {
	println 'Usage: script $SEARCH_HOME collection-name'
	return 1
}

def searchHome = new File(args[0])
def collectionName = args[1]

// Create a configuration object to read collection.cfg
def config = new NoOptionsConfig(searchHome, collectionName)

def projectBlacklist = config.value(CONFIG_PREFIX + '.project.blacklist', '').tokenize(',')
println "The following projects are blacklisted: ${projectBlacklist}"

def projectWhitelist = config.value(CONFIG_PREFIX + '.project.whitelist', '').tokenize(',')
println "The following projects are whitelist: ${projectWhitelist}"

// Time between requests
def requestDelay = config.valueAsInt(CONFIG_PREFIX + '.request_delay', 50)

// Maximum number of records to store
def maxStored = config.valueAsInt(CONFIG_PREFIX + '.max_stored', DEFAULT_MAX_STORED)

def jiraUrl = config.value(CONFIG_PREFIX + '.url')
def jiraUser = config.value(CONFIG_PREFIX + '.username')
def jiraPassword = config.value(CONFIG_PREFIX + '.password')
def auth = (jiraUser + ":" + jiraPassword).getBytes().encodeBase64().toString()

def isLast = false
def start = 0
def page = config.valueAsInt(CONFIG_PREFIX + '.pagesize', 100)
def total = 0
def stored = 0

// Create a Store instance to store gathered data in Funnelback
def store = new RawBytesStoreFactory(config)
	.withFilteringEnabled(true)
	.newStore()

store.open()

// Metadata associated with the document
def metadata = ArrayListMultimap.create()
metadata.put("Content-Type", "application/json")

config.progressMessage = "Fetching JIRA tickets from '${jiraUrl}'', via user '${jiraUser}''"
println config.progressMessage

try {
	while (!isLast && !config.updateStopped && stored < maxStored) {
		// Get a connector to jira
		def url = new URL("${jiraUrl}/rest/api/2/search?fields=*all&startAt=${start}&maxResults=${page}")
		println "Contacting ${url} …"
		def cnx = url.openConnection()
		cnx.setRequestProperty("Authorization", "Basic ${auth}")
		cnx.setRequestProperty("Accept", "application/json")
		cnx.setConnectTimeout(config.valueAsInt(CONFIG_PREFIX + '.connection.timeout', 60000))
		cnx.setReadTimeout(config.valueAsInt(CONFIG_PREFIX + '.connection.timeout', 60000))

		cnx.connect()

		// Get the data from the API
		def data = new JsonSlurper().parse(cnx.inputStream)

		// Get all the issues
		data["issues"]
		.findAll {
			// Ignore things on the blacklist
			// If a whitelist has been specified, only include those on that list
			issue ->
			projectBlacklist.contains(issue.fields.project.key) == false && 
			(projectWhitelist.size() == 0 ||
			projectWhitelist.contains(issue.fields.project.key))
		}
		.each {
			// Add issues to the store
			issue ->
			def record = new RawBytesRecord(
				JsonOutput.toJson(issue).getBytes("UTF-8"),
				"${jiraUrl}/browse/${issue.key}" as String)
			store.add(record, metadata)
			println "Stored ${issue.key}"
			stored++
		}

		start += page
		total = data["total"]
		
		if (start > total) {
			isLast = true
		}

		config.progressMessage = "Fetched ${start} / ${total} issues"
		println config.progressMessage

		// Sets a delay between connections. Useful to prevent exceeding the 
		// API rate limit
		if (requestDelay > 0) {
			println "Sleeping for ${requestDelay}ms (gather.request_delay)"
			Thread.sleep(requestDelay)
		}
	}
} finally {
	store.close()
}

println "Fetched ${total} JIRA tickets, stored ${stored}"

String fixInvalidChars(str) {
	return (str =~ /[^\u0009\u000A\u000D\u0020-\uD7FF\uE000-\uFFFD\u10000-\u10FFF]+/).replaceAll('')
}

String fixInvalidField(str) { // fe. "48x48", "24x24"
	return (str =~ /"(\d+x\d+)":/).replaceAll('"img$1":')
}

collection.cfg

admin_email=gtran@funnelback.com
changeover_percent=0
collection=onesearch-squiz-jira-custom
collection_type=custom
data_report=false
filter=true
filter.classes=JSONToXML
gather=custom-gather
gather.request_delay=1000
group.project_id=OneSearch
custom_gather.jira.max_stored=100
custom_gather.jira.password=<password>
custom_gather.jira.url=https://jira.squiz.net
custom_gather.jira.username=<username>
custom_gather.jira.project.blacklist=FUNTEST,FUNDEMO,FUNDOC,FUNSU,FUNSMS,FUNPS,FUNPC,FUN,RNDSUPPORT,FBPM,FUNSI,APD,FUNNELAPPS,DOTCOM,TRAINING,PAR,FM,FUNTRANET,FBPP,FUNSPM,SCALESTENC
query_processor_options=-stem=2 -SM=both -num_ranks=20 -SF=[.*]
service_name=Onesearch - Squiz - Jira
start_url=
store.record.type=XmlRecord

Thanks Gioan will give it a go! :slight_smile:

happy to help :slight_smile: I got a few extra examples if ya need. Just lemme know and I’ll shoot them on through.

This is actually possible with the original setup. The key points are:

  • Link parsing runs prior to the filter step, so you should parse on JSON
  • Take care of whitespace!

The settings that worked for me in a local test (mimicking your JSON layout):

crawler.link_extraction_group=1
crawler.link_extraction_regular_expression="next"\s*:\s*"(.*?)"