| # Copyright (C) 2010 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Sample for threading and queues. |
| |
| A simple sample that processes many requests by constructing a threadpool and |
| passing client requests by a thread queue to be processed. |
| """ |
| from apiclient.discovery import build |
| from apiclient.errors import HttpError |
| from oauth2client.file import Storage |
| from oauth2client.client import OAuth2WebServerFlow |
| from oauth2client.tools import run |
| |
| import Queue |
| import gflags |
| import httplib2 |
| import logging |
| import sys |
| import threading |
| import time |
| |
| # How many threads to start. |
| NUM_THREADS = 3 |
| |
| # A list of URLs to shorten. |
| BULK = [ |
| "https://code.google.com/apis/buzz/", |
| "https://code.google.com/apis/moderator/", |
| "https://code.google.com/apis/latitude/", |
| "https://code.google.com/apis/urlshortener/", |
| "https://code.google.com/apis/customsearch/", |
| "https://code.google.com/apis/shopping/search/", |
| "https://code.google.com/apis/predict", |
| "https://code.google.com/more", |
| ] |
| |
| FLAGS = gflags.FLAGS |
| FLOW = OAuth2WebServerFlow( |
| client_id='433807057907.apps.googleusercontent.com', |
| client_secret='jigtZpMApkRxncxikFpR+SFg', |
| scope='https://www.googleapis.com/auth/urlshortener', |
| user_agent='urlshortener-cmdline-sample/1.0') |
| |
| gflags.DEFINE_enum('logging_level', 'ERROR', |
| ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], |
| 'Set the level of logging detail.') |
| |
| queue = Queue.Queue() |
| |
| |
| class Backoff: |
| """Exponential Backoff |
| |
| Implements an exponential backoff algorithm. |
| Instantiate and call loop() each time through |
| the loop, and each time a request fails call |
| fail() which will delay an appropriate amount |
| of time. |
| """ |
| |
| def __init__(self, maxretries=8): |
| self.retry = 0 |
| self.maxretries = maxretries |
| self.first = True |
| |
| def loop(self): |
| if self.first: |
| self.first = False |
| return True |
| else: |
| return self.retry < self.maxretries |
| |
| def fail(self): |
| self.retry += 1 |
| delay = 2 ** self.retry |
| time.sleep(delay) |
| |
| |
| def start_threads(credentials): |
| """Create the thread pool to process the requests.""" |
| |
| def process_requests(n): |
| http = httplib2.Http() |
| http = credentials.authorize(http) |
| loop = True |
| |
| |
| while loop: |
| request = queue.get() |
| backoff = Backoff() |
| while backoff.loop(): |
| try: |
| response = request.execute(http) |
| print "Processed: %s in thread %d" % (response['id'], n) |
| break |
| except HttpError, e: |
| if e.resp.status in [402, 403, 408, 503, 504]: |
| print "Increasing backoff, got status code: %d" % e.resp.status |
| backoff.fail() |
| except Exception, e: |
| print "Unexpected error. Exiting." + str(e) |
| loop = False |
| break |
| |
| print "Completed request" |
| queue.task_done() |
| |
| |
| for i in range(NUM_THREADS): |
| t = threading.Thread(target=process_requests, args=[i]) |
| t.daemon = True |
| t.start() |
| |
| |
| def main(argv): |
| try: |
| argv = FLAGS(argv) |
| except gflags.FlagsError, e: |
| print '%s\\nUsage: %s ARGS\\n%s' % (e, argv[0], FLAGS) |
| sys.exit(1) |
| |
| logging.getLogger().setLevel(getattr(logging, FLAGS.logging_level)) |
| |
| storage = Storage('threadqueue.dat') |
| credentials = storage.get() |
| if credentials is None or credentials.invalid == True: |
| credentials = run(FLOW, storage) |
| |
| start_threads(credentials) |
| |
| http = httplib2.Http() |
| http = credentials.authorize(http) |
| |
| service = build("urlshortener", "v1", http=http, |
| developerKey="AIzaSyDRRpR3GS1F1_jKNNM9HCNd2wJQyPG3oN0") |
| shortener = service.url() |
| |
| for url in BULK: |
| body = {"longUrl": url } |
| shorten_request = shortener.insert(body=body) |
| print "Adding request to queue" |
| queue.put(shorten_request) |
| |
| # Wait for all the requests to finish |
| queue.join() |
| |
| |
| if __name__ == "__main__": |
| main(sys.argv) |