最新のTwitter API「ChirpUserStreams」をPythonから使う


Twitter API「ChirpUserStreams」

Twitterが新しいAPI「ChirpUserStreams」を発表しました。

このAPIを使うことで、Twitterの自分のメインページで流れてくるようなつぶやき情報が、リアルタイムにストリーミングとして取得できるようになりました。

普通、TwitterAPIは使用回数に制限があるため、つぶやき情報やユーザーの情報を得ようとして何度もAPIを使っているうちに、制限に引っかかることがしばしばありました。

しかしながら、このストリーミングAPIは基本的にTwitterのサーバにつなぎっぱなしです。

つなぎっぱなしで、次から次へとフォロワーのつぶやき情報、Retweet情報、ダイレクト・メッセージ情報、Favorite情報などが流れてきます。

まるでこのAPIは、情報の波が次から次へと押し寄せてくるかの如くで、何か新しい可能性を感じさせるものでした。

Pythonから「ChirpUserStreams」を使う

Pythonからこの「ChirpUserStreams」を使ってみようと思い、標準ライブラリの非同期ソケットハンドラ「asyncore」を利用してみました。

このスクリプトは、Python 2.x系で動作するはずです。

simplejsonは、こちらのページのライブラリを利用しています。

スクリプトの一番下にあるUSER_ACCOUNTとPASSWORDの文字列を、自分のものに置き換えて利用してください。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import asyncore, socket
import base64
import simplejson

class TwitterStreamer(asyncore.dispatcher):
    HOST = 'chirpstream.twitter.com'
    PATH = '/2b/user.json'

    def __init__(self, user, password):
        asyncore.dispatcher.__init__(self)
        basic_auth = base64.b64encode('%s:%s' % (user, password))
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect((self.HOST, 80))
        self.send_buf = 'GET %s HTTP/1.0\r\n' % (self.PATH)
        self.send_buf += 'Authorization: Basic %s\r\n' % (basic_auth)
        self.send_buf += '\r\n'
        self.recv_buf = ''
        self.http_header_flag = True

    def handle_connect(self):
        pass

    def handle_close(self):
        self.close()

    def handle_read(self):
        buf = self.recv(10240)
        for line in buf.splitlines(True):
            if self.http_header_flag: # http header
                line = line.strip()
                if '' == line:
                    sys.stdout.write('\n')
                    self.http_header_flag = False
                else:
                    sys.stdout.write(line + '\n')
            else: # parse stream
                line = line.strip()
                if '' == line:
                    return
                self.recv_buf += line
                if '{' == self.recv_buf[0] and '}' == self.recv_buf[-1:]:
                    # json data is parsed by simplejson
                    array = simplejson.loads(self.recv_buf)
                    # both user and text data are prepared to print
                    user = ''
                    text = ''
                    if array.has_key('user'):
                        if array['user'].has_key('screen_name'):
                            user = array['user']['screen_name']
                    if array.has_key('text'):
                        text = array['text']
                    # print both user and text data
                    if '' != user and '' != text:
                        sys.stdout.write('%s:%s\n' % (
                                user.encode('utf-8'), text.encode('utf-8')))
                    # reset stored data
                    self.recv_buf = ''
                else:
                    # sometimes, json data is divided into 2 packets
                    # so, uncompleted data should be stored
                    self.recv_buf += line

    def writable(self):
        return (len(self.send_buf) > 0)

    def handle_write(self):
        sent = self.send(self.send_buf)
        self.send_buf = self.send_buf[sent:]

if '__main__' == __name__:
    ts = TwitterStreamer('USER_ACCOUNT', 'PASSWORD')
    asyncore.loop()

スクリプトを止めるときは、Ctrl+Cなどで停止させてください。

ちなみに、このスクリプトBASIC認証を利用していますが、Twitterの認証方式は6月末あたりにBASIC認証を廃止します。

そのため、このスクリプトにも有効期限がありますが、このAPIを試しに使ってみて、「ChirpUserStreams」の効果を感じてください。

スクリプト実行結果

このスクリプトを実行することで、コンソール上に次から次へとつぶやき情報が表示されます。

こんな少しのスクリプトで、大量の情報の流れを手に入れられるなんて、素晴らしい時代になりましたね。