Ingest Game-Streaming Data from the Twitch API

Nick JastixNikolay Petrachkov (Nik for short) is a BI developer in Amsterdam by day, but in his spare time, he combines his passion for games and data engineering by building a project to analyze game-streaming data from Twitch. Nik discovered StreamSets Data Collector when he was looking for a way to build data pipelines to deliver insights from gaming data without having to write a ton of code. In this guest post, reposted from the original with his kind permission, Nik explains how he used StreamSets Data Collector to extract data about streams and games via the Twitch API. It's a great example of applying enterprise dataops principles to a fun use case. Over to you, Nik…

In a previous post we explored the Twitch API using the Elixir programming language. We did our exploration in order to plan how to build a process that acquires data from the Twitch API. Data acquisition is a common problem in data analysis and business intelligence. In data warehousing there is a process called ETL (Extract, Transform, Load), which represents how data flows from source systems to destinations. One way to acquire data is to write custom code for each source (bringing challenges of maintenance, flexibility, reliability). The other way is to use a system that was built to solve the data acquisition problem.

StreamSets Data Collector

One of the more modern systems for data acquisition is StreamSets Data Collector. Data Collector contains connectors to many systems that act as origins or destinations. Those systems include not only traditional systems such as relational databases, files on a local file system, FTP, but also more modern ones such as Kafka, Hadoop FS, and cloud data stores, for example, Amazon S3, Google BigQuery and Microsoft Azure HDInsight. Another feature ofData Collector is its graphical user interface for building data pipelines from predefined blocks, or stages. Stages come in four categories:

  1. Origins (stages that define from where and how to acquire data);
  2. Processors (stages to transform data);
  3. Destinations (stages that define where data will be saved);
  4. Executors (stages that define that some task should be triggered).

Stages are described in detail in the Data Collector documentation.

Building Data Pipelines with Data Collector

We will build data pipelines to acquire data about streams and games on Twitch. The process to obtain data about streams and games looks like this:

  1. Find user’s username (e.g., from a Twitch URL);
  2. Make a request to the Twitch API to convert username to stream id;
  3. Make a request to the Twitch API (using stream_id) to obtain data about the user’s stream (is there a live steam, is there a recording being played);
  4. Make a request to the Twitch API (using game name) to acquire data about the game’s stats on Twitch (how many viewers in total for this game);
  5. Make a request to the Giant Bomb API (using giantbomb_id) to acquire data about a particular game (genre and release date)

Converting User Name to stream_id

Name to Stream ID
Twitch name to stream_id pipeline

First, I built a data pipeline to convert user names to stream ids. The data pipeline starts by watching a directory for new text files. The names of the text files correspond to user names and contain one line with a user name. The Data Collector pipeline reads new text files, saves the user name from the text file into a record field called text, then uses that value in the HTTP Client processor to make a call to the Twitch API. After that, the Expression Evaluator processor (named Cast Datein the figure above) is used to enrich the record with a captured_atfield, which contains the parsed timestamp value of the Date field of the response, and to extract user name and stream_id from nested field of the response to the root of the field. In terms of Data Collector expressions, flattening looks as follows:

  • Field Expression contains code ${record:value('/result/users[0]/_id')};
  • Output field is /stream_id.

Originally, the Date field contained a date-time value in the following format Sun, 08 Apr 2018 11:45:57 GMT. I used this expression to parse this date into a Data Collector date-time:

JDBC Producer (named Postgresql localin the figure) and Local FS (named Save to FS) are destination stages. JDBC Producer contains the JDBC connection string, schema, and table names. The database, schema, and table should exist before the pipeline is run. To properly save data into the table, the JDBC Producer stage contains a Field to Column Mapping, which is used to map fields of Data Collector records to columns in the table. In this case, the fields /name/stream_id/captured_at were mapped to columns with the same names in the PostgreSQL database. Fields, which are not in this mapping (and do not match column names in the table), are ignored and do not raise errors.

The Local FS destination is used to save Data Collector records into files on a local file system. The stage is configured to save Data Collector records into JSON format (multiple JSON objects per file).

Fetching data about streams

Fetch Streams
Twitch streams pipeline

The pipeline to fetch data about streams starts with a query that selects stream_ids from a PostgreSQL table (stage JDBC Query Consumer), which was populated in the previous pipeline. This SQL query is executed every 2 minutes. This means that even though the pipeline is running continuously, the input appears every two minutes. For each saved stream_id, the HTTP Client processor (named Fetch stream data from Twitch API) executes an HTTP query to obtain data about the current state of the stream. The next step (Filter empty streams) conditionally selects where Data Collector records will be directed. If a stream is not active, then the corresponding Data Collector record will be discarded. If a stream is active, then the Data Collector record is directed to output number 2 to be transformed in other stages.

The Field remover processor (named Remove Community ID) is used to remove unwanted fields from the Data Collector record. Initially it was set up to remove community ids from Data Collector records, but later was extended to also remove links to image previews and profile banners.

The next processor, Field flattener (named Flatten field Result), is used to flatten nested data (to have records where all fields are under root and there are no nested fields). The following stage, Field renamer (named Rename fields to contain underscores), is used to rename fields to have shorter names (from /'result.stream.delay' to /delay) or to use underscore as a delimiter instead of dot.

One of the record fields is average_fps, which sometimes contains integer value and sometimes floating point value. Field Type Converter is used to force the average_fps field to a float. The remaining stages, Expression Evaluator (used to parse datetime values from strings), JDBC Producer and Local FS, are similar to the stages in the previous data pipeline.

Fetching data about games on Twitch

Fetch Twitch Games
Twitch games pipeline

The pipeline that fetches data about games on Twitch looks very similar to the pipeline that fetches data about streams: it starts with a JDBC Query Consumer, which (every 5 minutes) executes a SQL query to select unique game names from last 5 minutes, then for each game name makes an HTTP request to the Twitch API to find the game by name. The Twitch API might return an array with multiple results; we assume that first result is the correct one. The Field Renamer (named Flatten first result) processor renames nested fields to be directly under root. The Field Remover (named Remove all unused fields) is configured to only keep fields /giantbomb_id/_id/name/popularity. All other fields will be removed. The rest of the pipeline adds a new time stamp field /captured_at to records and saves them to the PostgreSQL database and into files on the local file system.

Fetching data about games from Giantbomb

Fetch Giant Bomb Game
Giant bomb pipeline

The pipeline that fetches data about games from the Giant Bomb API follows the same pattern as previous pipelines that fetch data from Twitch API:

  • Execute SQL query to get data from a database;
  • Make a query to Giant Bomb API;
  • Filter responses;
  • Move fields from nested fields to be directly under root (flatten fields);
  • Remove unwanted fields and enrich records with time stamp captured_at;
  • Save data to PostgreSQL database and to JSON files on local file system.

 

Problems I Encountered

Time Zones

Dealing with time zones is a common problem in software development. It was difficult to compare the value of a parsed date time field in a Data Collector record with a time stamp string from the HTTP response. The issue is that the original string contains a date in the GMT time zone, while the parsed value was represented in the local time zone. It was a matter of finding a correct offset when parsing a string containing a date. It would probably be a good idea to run the system, Data Collector, and PostgreSQL all in the UTC time zone to avoid confusion.

LZ4 Compression

The Local FS destination, which saves data to local files, was initially set up to compress output files with LZ4 compression. One day later I decided to try to decompress those LZ4 files and failed. On the Lxle 16.04.3 Linux distribution (based on Ubuntu 16.04) and Fedora 27, the command-line command lz4 failed to decompress those files with an error:

After this, I changed the compression setting to GZIP and that worked well.

Extra Data in Saved Files

The data pipeline that converts Twitch names to Twitch IDs does not contain stages that remove unwanted fields. This does not cause any problems for the JDBC Producer destination, that saves data to PostgreSQL, because only mapped fields are considered and the rest are ignored. For the Local FS destination this means that the whole Data Collector record is saved into a file. In this case, it means that saved JSON would contain data from a text file, added fields, and the whole HTTP response.

Positive things about Data Collector

  1. Errors in stages do not stop the whole pipeline.
  2. When a pipeline fails (in these pipelines errors were due to DNS errors, when addresses for HTTP requests could not be resolved) it does not affect the whole system.
  3. Exported pipeline definitions are saved as JSON files and could be easily saved to git, and git diff would show meaningful results.
  4. Pipeline preview. This is a great feature for exploring data as it flows through a pipeline and for debugging purposes.
Stage Preview
Data Collector preview

 

Refactoring

Twitch API pipelines that fetch data for streams and games were made separate to avoid excessive requests to the Twitch API. When several streamers are playing the same game at the same time, then it makes sense to make only one request to fetch data about a game, instead of making a request for each live stream. While browsing through the Data Collector stages, I spotted a processor that might help me solve the duplicated requests problem: Record Deduplicator. Using this stage, I merged the two pipelines that fetch data about Twitch streams and twitch games into a single new pipeline.

Refactored Pipeline
Refactored pipeline to fetch Twitch streams and games

In the refactored pipeline, the Record deduplicator (named Deduplicate game names) was set to deduplicate the /game field over the last 180 seconds (parameter Time to Compare). This pipeline has been working for several days in place of the two previous pipelines without any problems.

Results of Running Data Pipelines with Data Collector for 1 Month

  • Size of the log file: 139 MB
  • Number of crashes: 0
  • Number of Twitch channels that were observed: 20
  • Number of events captured for Twitch streams: 83534
  • Number of events captured for Twitch games: 32224
  • Number of records saved for Giant bomb games: 555

 

Conclusion

We implemented several data pipelines that interact with the Twitch API to obtain data about streams and streamed games, and with the Giant Bomb API to fetch data about games. Those data pipelines were implemented using StreamSets Data Collector. Data Collector provided all of the necessary components to implement the designed pipelines: JDBC connection, HTTP client, a component to interact with local file system, and components to transform data. Apart from components the components we used, Data Collector provides a wide range of other components to interact with Hadoop FS, Kafka, Hive, Amazon S3, Azure, Redis and more.

Extensive documentation and a good UI helped with learning how to develop pipelines with Data Collector. I used the “Pipeline Preview” feature extensively to explore data as it flowed through each pipeline and to debug them.

Errors that appeared in one month of running Data Collector were related to the network and they affected particular stages (only HTTP Client) or whole pipelines (when the address of Twitch API could not be resolved), but never stopped the whole system. The fact that Data Collector has not crashed even once in one month makes it a good tool for our use case when data is streamed and can be captured only “now or never”, meaning that if data was not captured in any moment then it is gone forever.

The definition of a pipeline can be saved into a JSON file and then placed into version control. The only downside is that JSON files contain positions of UI elements and if those positions change then it will be shown in Git as a diff.

Data Collector is an open source project with source code hosted on GitHub, and StreamSets offers several options for installation from using source code to a Docker container.

We used the tarball option and the setup was almost non-existent (extract archive and start a script).

Those data pipelines were running on an old laptop powered with CPU Intel SU2300 (2 cores, 1.2 GHz) and 3 GB RAM using default configuration of SDC.

Use of Acquired Data

The acquired data allows us to answer some questions about streams, such as, “Which channel was online the most in some period?”, or “Which channel had the highest stream steak?”

To compute which channels were online the most, we assume that the number of captured events can be used to compare the duration of time a channel was online. Running a simple query in psql yields a result that channels “burkeblack”, “nuke73”, and “lirik” were most online.

Channels by Online Time
Channels by online time

An interesting observation is that 20 channels were added to be observed, but only 19 channels were returned by this SQL query. Names of channels were added in the course of several days and 2018-04-10 is the first full day when no more channels were added, but one channel was last streamed on 2018-04-09 and no more during the run of data pipelines.

The problem of finding a maximal streaming streak for channels is more difficult. A post in the Pivotal Engineering blog described a similar problem in relation to trail rides. The results of adapting their SQL query to our use case shows that “lirik” had a streaming streak of 20 days out of 30 days observed. For this query only live streams were considered.

Longest Streaks
Channels by streaming streak

Those are examples of an ad hoc analysis with psql. To bring analysis to the next level with visualizations and an ability to share them with other people, there are plenty of tools. For example, open-source software Apache Superset (incubating) and Metabase.

Conclusion

Many thanks to Nik for taking the time to write up his experiences with StreamSets Data Collector! Are you using Data Collector outside your day job? Let us know in the comments – we'd love to feature your use case in a future guest post!

Share This Article :

Related Posts

Receive Updates

Receive Updates

Join our mailing list to receive the latest news from StreamSets.

You have Successfully Subscribed!

Pin It on Pinterest